Remove dependency on pipe events in HttpConnection (#11132)
- Refactored the HttpConnection to not depend on OnReaderCompleted and OnWriterCompleted. Instead we use ConnectionClosed to detect the FIN that we propagate via ConnectedAborted. - Fire ConnectionClosed when a FIN is received from both transports. - Remove pipe completion from Http1Connection and Http1OutputProducer. Instead just return from request processing. - Cancel the transport input on RawStream to yield the pending read. This is much more efficient than passing a cancellation token into everything (and more reliable) - Fixed the RequestTests to not depend on inline scheduling - Properly dispose things in the LibuvOutputConsumerTests - Skipped flaky test
This commit is contained in:
parent
874050f1dd
commit
b0be780f1b
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
|
|
@ -14,7 +15,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
{
|
||||
private readonly int _minAllocBufferSize;
|
||||
|
||||
private readonly IDuplexPipe _transport;
|
||||
private Task _inputTask;
|
||||
private Task _outputTask;
|
||||
|
||||
public AdaptedPipeline(IDuplexPipe transport,
|
||||
Pipe inputPipe,
|
||||
|
|
@ -22,13 +24,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
IKestrelTrace log,
|
||||
int minAllocBufferSize)
|
||||
{
|
||||
_transport = transport;
|
||||
TransportStream = new RawStream(transport.Input, transport.Output, throwOnCancelled: true);
|
||||
Input = inputPipe;
|
||||
Output = outputPipe;
|
||||
Log = log;
|
||||
_minAllocBufferSize = minAllocBufferSize;
|
||||
}
|
||||
|
||||
public RawStream TransportStream { get; }
|
||||
|
||||
public Pipe Input { get; }
|
||||
|
||||
public Pipe Output { get; }
|
||||
|
|
@ -39,13 +43,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
|
||||
PipeWriter IDuplexPipe.Output => Output.Writer;
|
||||
|
||||
public async Task RunAsync(Stream stream)
|
||||
public void RunAsync(Stream stream)
|
||||
{
|
||||
var inputTask = ReadInputAsync(stream);
|
||||
var outputTask = WriteOutputAsync(stream);
|
||||
_inputTask = ReadInputAsync(stream);
|
||||
_outputTask = WriteOutputAsync(stream);
|
||||
}
|
||||
|
||||
await inputTask;
|
||||
await outputTask;
|
||||
public async Task CompleteAsync()
|
||||
{
|
||||
Output.Writer.Complete();
|
||||
Input.Reader.Complete();
|
||||
|
||||
if (_outputTask == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for the output task to complete, this ensures that we've copied
|
||||
// the application data to the underlying stream
|
||||
await _outputTask;
|
||||
|
||||
// Cancel the underlying stream so that the input task yields
|
||||
TransportStream.CancelPendingRead();
|
||||
|
||||
// The input task should yield now that we've cancelled it
|
||||
await _inputTask;
|
||||
}
|
||||
|
||||
private async Task WriteOutputAsync(Stream stream)
|
||||
|
|
@ -97,7 +119,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
finally
|
||||
{
|
||||
Output.Reader.Complete();
|
||||
_transport.Output.Complete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +136,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
|
||||
while (true)
|
||||
{
|
||||
|
||||
var outputBuffer = Input.Writer.GetMemory(_minAllocBufferSize);
|
||||
var bytesRead = await stream.ReadAsync(outputBuffer);
|
||||
Input.Writer.Advance(bytesRead);
|
||||
|
|
@ -134,6 +154,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
{
|
||||
// Propagate the exception if it's ConnectionAbortedException
|
||||
error = ex as ConnectionAbortedException;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
|
||||
|
|
@ -142,9 +167,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
finally
|
||||
{
|
||||
Input.Writer.Complete(error);
|
||||
// The application could have ended the input pipe so complete
|
||||
// the transport pipe as well
|
||||
_transport.Input.Complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,11 +14,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
{
|
||||
private readonly PipeReader _input;
|
||||
private readonly PipeWriter _output;
|
||||
private readonly bool _throwOnCancelled;
|
||||
private volatile bool _cancelCalled;
|
||||
|
||||
public RawStream(PipeReader input, PipeWriter output)
|
||||
public RawStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false)
|
||||
{
|
||||
_input = input;
|
||||
_output = output;
|
||||
_throwOnCancelled = throwOnCancelled;
|
||||
}
|
||||
|
||||
public void CancelPendingRead()
|
||||
{
|
||||
_cancelCalled = true;
|
||||
_input.CancelPendingRead();
|
||||
}
|
||||
|
||||
public override bool CanRead => true;
|
||||
|
|
@ -61,17 +70,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
{
|
||||
// ValueTask uses .GetAwaiter().GetResult() if necessary
|
||||
// https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156
|
||||
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count)).Result;
|
||||
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default).Result;
|
||||
}
|
||||
|
||||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count)).AsTask();
|
||||
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
|
||||
}
|
||||
|
||||
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return ReadAsyncInternal(destination);
|
||||
return ReadAsyncInternal(destination, cancellationToken);
|
||||
}
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
|
|
@ -105,14 +114,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
return WriteAsync(null, 0, 0, cancellationToken);
|
||||
}
|
||||
|
||||
private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination)
|
||||
private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var result = await _input.ReadAsync();
|
||||
var result = await _input.ReadAsync(cancellationToken);
|
||||
var readableBuffer = result.Buffer;
|
||||
try
|
||||
{
|
||||
if (_throwOnCancelled && result.IsCanceled && _cancelCalled)
|
||||
{
|
||||
// Reset the bool
|
||||
_cancelCalled = false;
|
||||
throw new OperationCanceledException();
|
||||
}
|
||||
|
||||
if (!readableBuffer.IsEmpty)
|
||||
{
|
||||
// buffer.Count is int
|
||||
|
|
|
|||
|
|
@ -68,8 +68,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
protected override void OnRequestProcessingEnded()
|
||||
{
|
||||
Input.Complete();
|
||||
|
||||
TimeoutControl.StartDrainTimeout(MinResponseDataRate, ServerOptions.Limits.MaxResponseBufferSize);
|
||||
|
||||
// Prevent RequestAborted from firing. Free up unneeded feature references.
|
||||
|
|
|
|||
|
|
@ -86,6 +86,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
_memoryPool = memoryPool;
|
||||
}
|
||||
|
||||
// For tests
|
||||
internal PipeWriter PipeWriter => _pipeWriter;
|
||||
|
||||
public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
|
|
@ -402,7 +405,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
_log.ConnectionDisconnect(_connectionId);
|
||||
_pipeWriterCompleted = true;
|
||||
_completed = true;
|
||||
_pipeWriter.Complete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -82,7 +82,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
try
|
||||
{
|
||||
AdaptedPipeline adaptedPipeline = null;
|
||||
var adaptedPipelineTask = Task.CompletedTask;
|
||||
|
||||
// _adaptedTransport must be set prior to wiring up callbacks
|
||||
// to allow the connection to be aborted prior to protocol selection.
|
||||
|
|
@ -120,8 +119,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
if (adaptedPipeline != null)
|
||||
{
|
||||
// Stream can be null here and run async will close the connection in that case
|
||||
var stream = await ApplyConnectionAdaptersAsync();
|
||||
adaptedPipelineTask = adaptedPipeline.RunAsync(stream);
|
||||
var stream = await ApplyConnectionAdaptersAsync(adaptedPipeline.TransportStream);
|
||||
adaptedPipeline.RunAsync(stream);
|
||||
}
|
||||
|
||||
IRequestProcessor requestProcessor = null;
|
||||
|
|
@ -160,20 +159,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
}
|
||||
}
|
||||
|
||||
_context.Transport.Input.OnWriterCompleted(
|
||||
(_, state) => ((HttpConnection)state).OnInputOrOutputCompleted(),
|
||||
this);
|
||||
var closedRegistration = _context.ConnectionContext.ConnectionClosed.Register(state => ((HttpConnection)state).OnInputOrOutputCompleted(), this);
|
||||
|
||||
_context.Transport.Output.OnReaderCompleted(
|
||||
(_, state) => ((HttpConnection)state).OnInputOrOutputCompleted(),
|
||||
this);
|
||||
|
||||
if (requestProcessor != null)
|
||||
// We don't care about callbacks once all requests are processed
|
||||
using (closedRegistration)
|
||||
{
|
||||
await requestProcessor.ProcessRequestsAsync(httpApplication);
|
||||
if (requestProcessor != null)
|
||||
{
|
||||
await requestProcessor.ProcessRequestsAsync(httpApplication);
|
||||
}
|
||||
}
|
||||
|
||||
await adaptedPipelineTask;
|
||||
// Complete the pipeline after the method runs
|
||||
await (adaptedPipeline?.CompleteAsync() ?? Task.CompletedTask);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
|
@ -277,10 +275,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
}
|
||||
}
|
||||
|
||||
private async Task<Stream> ApplyConnectionAdaptersAsync()
|
||||
private async Task<Stream> ApplyConnectionAdaptersAsync(RawStream stream)
|
||||
{
|
||||
var connectionAdapters = _context.ConnectionAdapters;
|
||||
var stream = new RawStream(_context.Transport.Input, _context.Transport.Output);
|
||||
var adapterContext = new ConnectionAdapterContext(_context.ConnectionContext, stream);
|
||||
_adaptedConnections = new List<IAdaptedConnection>(connectionAdapters.Count);
|
||||
|
||||
|
|
@ -367,12 +364,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
|
||||
private void CloseUninitializedConnection(ConnectionAbortedException abortReason)
|
||||
{
|
||||
Debug.Assert(_adaptedTransport != null);
|
||||
|
||||
_context.ConnectionContext.Abort(abortReason);
|
||||
|
||||
_adaptedTransport.Input.Complete();
|
||||
_adaptedTransport.Output.Complete();
|
||||
}
|
||||
|
||||
public void OnTimeout(TimeoutReason reason)
|
||||
|
|
|
|||
|
|
@ -47,9 +47,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
|
||||
await socketOutput.WriteDataAsync(new byte[] { 1, 2, 3, 4 }, default);
|
||||
|
||||
Assert.True(socketOutput.Pipe.Reader.TryRead(out var result));
|
||||
Assert.True(result.IsCompleted);
|
||||
Assert.True(result.Buffer.IsEmpty);
|
||||
Assert.False(socketOutput.Pipe.Reader.TryRead(out var result));
|
||||
|
||||
socketOutput.Pipe.Writer.Complete();
|
||||
socketOutput.Pipe.Reader.Complete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
|
||||
private MemoryHandle _bufferHandle;
|
||||
private Task _processingTask;
|
||||
private readonly TaskCompletionSource<object> _waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private bool _connectionClosed;
|
||||
|
||||
public LibuvConnection(UvStreamHandle socket,
|
||||
ILibuvTrace log,
|
||||
|
|
@ -135,21 +137,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
// We're done with the socket now
|
||||
_socket.Dispose();
|
||||
|
||||
// Fire the connection closed token and wait for it to complete
|
||||
var waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
// Ensure this always fires
|
||||
FireConnectionClosed();
|
||||
|
||||
ThreadPool.UnsafeQueueUserWorkItem(state =>
|
||||
{
|
||||
(var connection, var tcs) = state;
|
||||
|
||||
connection.CancelConnectionClosedToken();
|
||||
|
||||
tcs.TrySetResult(null);
|
||||
},
|
||||
(this, waitForConnectionClosedTcs),
|
||||
preferLocal: false);
|
||||
|
||||
await waitForConnectionClosedTcs.Task;
|
||||
await _waitForConnectionClosedTcs.Task;
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
|
|
@ -241,11 +232,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
error = LogAndWrapReadError(uvError);
|
||||
}
|
||||
|
||||
FireConnectionClosed();
|
||||
|
||||
// Complete after aborting the connection
|
||||
Input.Complete(error);
|
||||
}
|
||||
}
|
||||
|
||||
private void FireConnectionClosed()
|
||||
{
|
||||
// Guard against scheduling this multiple times
|
||||
if (_connectionClosed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_connectionClosed = true;
|
||||
|
||||
ThreadPool.UnsafeQueueUserWorkItem(state =>
|
||||
{
|
||||
state.CancelConnectionClosedToken();
|
||||
|
||||
state._waitForConnectionClosedTcs.TrySetResult(null);
|
||||
},
|
||||
this,
|
||||
preferLocal: false);
|
||||
}
|
||||
|
||||
private async Task ApplyBackpressureAsync(ValueTask<FlushResult> flushTask)
|
||||
{
|
||||
Log.ConnectionPause(ConnectionId);
|
||||
|
|
|
|||
|
|
@ -75,8 +75,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
// At least one run of this test should have a MaxResponseBufferSize < 1 MB.
|
||||
var bufferSize = 1024 * 1024;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
|
@ -113,8 +114,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
// Don't want to allocate anything too huge for perf. This is at least larger than the default buffer.
|
||||
var bufferSize = 1024 * 1024;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
|
@ -163,8 +165,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = 1;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
||||
|
|
@ -221,8 +224,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = maxResponseBufferSize - 1;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
||||
|
|
@ -287,8 +291,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = maxResponseBufferSize / 2;
|
||||
var data = new byte[bufferSize];
|
||||
var halfWriteBehindBuffer = new ArraySegment<byte>(data, 0, bufferSize);
|
||||
|
|
@ -355,8 +360,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions, abortedSource))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions, abortedSource))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = maxResponseBufferSize - 1;
|
||||
|
||||
var data = new byte[bufferSize];
|
||||
|
|
@ -450,8 +456,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = maxResponseBufferSize - 1;
|
||||
|
||||
var data = new byte[bufferSize];
|
||||
|
|
@ -536,8 +543,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = maxResponseBufferSize;
|
||||
|
||||
var data = new byte[bufferSize];
|
||||
|
|
@ -620,8 +628,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
var bufferSize = maxResponseBufferSize - 1;
|
||||
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
|
||||
|
||||
|
|
@ -683,8 +692,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
useSynchronizationContext: false
|
||||
);
|
||||
|
||||
using (var outputProducer = CreateOutputProducer(pipeOptions))
|
||||
await using (var processor = CreateOutputProducer(pipeOptions))
|
||||
{
|
||||
var outputProducer = processor.OutputProducer;
|
||||
_mockLibuv.KestrelThreadBlocker.Reset();
|
||||
|
||||
var buffer = new ArraySegment<byte>(new byte[1]);
|
||||
|
|
@ -712,7 +722,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
}
|
||||
}
|
||||
|
||||
private Http1OutputProducer CreateOutputProducer(PipeOptions pipeOptions, CancellationTokenSource cts = null)
|
||||
private LibuvOuputProcessor CreateOutputProducer(PipeOptions pipeOptions, CancellationTokenSource cts = null)
|
||||
{
|
||||
var pair = DuplexPipe.CreateConnectionPair(pipeOptions, pipeOptions);
|
||||
|
||||
|
|
@ -745,9 +755,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
http1Connection.RequestAborted.Register(cts.Cancel);
|
||||
}
|
||||
|
||||
var ignore = WriteOutputAsync(consumer, pair.Application.Input, http1Connection);
|
||||
var outputTask = WriteOutputAsync(consumer, pair.Application.Input, http1Connection);
|
||||
|
||||
return (Http1OutputProducer)http1Connection.Output;
|
||||
var processor = new LibuvOuputProcessor
|
||||
{
|
||||
ProcessingTask = outputTask,
|
||||
OutputProducer = (Http1OutputProducer)http1Connection.Output
|
||||
};
|
||||
|
||||
return processor;
|
||||
}
|
||||
|
||||
private class LibuvOuputProcessor
|
||||
{
|
||||
public Http1OutputProducer OutputProducer { get; set; }
|
||||
public Task ProcessingTask { get; set; }
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
OutputProducer.PipeWriter.Complete();
|
||||
|
||||
await ProcessingTask;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task WriteOutputAsync(LibuvOutputConsumer consumer, PipeReader outputReader, Http1Connection http1Connection)
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
private volatile bool _socketDisposed;
|
||||
private volatile Exception _shutdownReason;
|
||||
private Task _processingTask;
|
||||
private readonly TaskCompletionSource<object> _waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private bool _connectionClosed;
|
||||
|
||||
internal SocketConnection(Socket socket,
|
||||
MemoryPool<byte> memoryPool,
|
||||
|
|
@ -98,22 +100,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
|
||||
_receiver.Dispose();
|
||||
_sender.Dispose();
|
||||
|
||||
// Fire the connection closed token and wait for it to complete
|
||||
var waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
ThreadPool.UnsafeQueueUserWorkItem(state =>
|
||||
{
|
||||
(var connection, var tcs) = state;
|
||||
|
||||
connection.CancelConnectionClosedToken();
|
||||
|
||||
tcs.TrySetResult(null);
|
||||
},
|
||||
(this, waitForConnectionClosedTcs),
|
||||
preferLocal: false);
|
||||
|
||||
await waitForConnectionClosedTcs.Task;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
|
@ -187,6 +173,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
{
|
||||
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
|
||||
Input.Complete(_shutdownReason ?? error);
|
||||
|
||||
FireConnectionClosed();
|
||||
|
||||
await _waitForConnectionClosedTcs.Task;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -307,6 +297,26 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
}
|
||||
}
|
||||
|
||||
private void FireConnectionClosed()
|
||||
{
|
||||
// Guard against scheduling this multiple times
|
||||
if (_connectionClosed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_connectionClosed = true;
|
||||
|
||||
ThreadPool.UnsafeQueueUserWorkItem(state =>
|
||||
{
|
||||
state.CancelConnectionClosedToken();
|
||||
|
||||
state._waitForConnectionClosedTcs.TrySetResult(null);
|
||||
},
|
||||
this,
|
||||
preferLocal: false);
|
||||
}
|
||||
|
||||
private void Shutdown(Exception shutdownReason)
|
||||
{
|
||||
lock (_shutdownLock)
|
||||
|
|
|
|||
|
|
@ -407,7 +407,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
|
||||
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
var actual = await _innerStream.ReadAsync(buffer, offset, count);
|
||||
var actual = await _innerStream.ReadAsync(buffer, offset, count, cancellationToken);
|
||||
|
||||
BytesRead += actual;
|
||||
|
||||
|
|
|
|||
|
|
@ -840,12 +840,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Fact(Skip = "This test is racy and requires a product change.")]
|
||||
public async Task ConnectionClosesWhenFinReceivedBeforeRequestCompletes()
|
||||
{
|
||||
var testContext = new TestServiceContext(LoggerFactory)
|
||||
{
|
||||
// FIN callbacks are scheduled so run inline to make this test more reliable
|
||||
Scheduler = PipeScheduler.Inline
|
||||
};
|
||||
|
||||
|
|
@ -856,6 +855,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
await connection.Send(
|
||||
"POST / HTTP/1.1");
|
||||
connection.ShutdownSend();
|
||||
await connection.TransportConnection.WaitForCloseTask;
|
||||
await connection.ReceiveEnd();
|
||||
}
|
||||
|
||||
|
|
@ -866,6 +866,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
"Host:",
|
||||
"Content-Length: 7");
|
||||
connection.ShutdownSend();
|
||||
await connection.TransportConnection.WaitForCloseTask;
|
||||
await connection.ReceiveEnd();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2483,16 +2483,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
[Fact]
|
||||
public async Task AppAbortViaIConnectionLifetimeFeatureIsLogged()
|
||||
{
|
||||
// Ensure the response doesn't get flush before the abort is observed by scheduling inline.
|
||||
var testContext = new TestServiceContext(LoggerFactory)
|
||||
{
|
||||
Scheduler = PipeScheduler.Inline
|
||||
};
|
||||
var testContext = new TestServiceContext(LoggerFactory);
|
||||
|
||||
await using (var server = new TestServer(httpContext =>
|
||||
{
|
||||
httpContext.Features.Get<IConnectionLifetimeFeature>().Abort();
|
||||
return Task.CompletedTask;
|
||||
var feature = httpContext.Features.Get<IConnectionLifetimeFeature>();
|
||||
feature.Abort();
|
||||
|
||||
// Ensure the response doesn't get flush before the abort is observed.
|
||||
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
feature.ConnectionClosed.Register(() => tcs.TrySetResult(null));
|
||||
|
||||
return tcs.Task;
|
||||
}, testContext))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
|
|
|
|||
|
|
@ -48,12 +48,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.TestTrans
|
|||
|
||||
public ConnectionAbortedException AbortReason { get; private set; }
|
||||
|
||||
public Task WaitForCloseTask => _waitForCloseTcs.Task;
|
||||
|
||||
public override void Abort(ConnectionAbortedException abortReason)
|
||||
{
|
||||
_logger.LogDebug(@"Connection id ""{ConnectionId}"" closing because: ""{Message}""", ConnectionId, abortReason?.Message);
|
||||
|
||||
Input.Complete(abortReason);
|
||||
|
||||
OnClosed();
|
||||
|
||||
AbortReason = abortReason;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue