diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs index b73cbdb50e..cf2778603d 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { @@ -16,17 +15,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal private const int MinAllocBufferSize = 2048; private readonly IKestrelTrace _trace; - private readonly IPipeWriter _transportOutputPipeWriter; + private readonly IPipe _transportOutputPipe; private readonly IPipeReader _transportInputPipeReader; public AdaptedPipeline(IPipeReader transportInputPipeReader, - IPipeWriter transportOutputPipeWriter, + IPipe transportOutputPipe, IPipe inputPipe, IPipe outputPipe, IKestrelTrace trace) { _transportInputPipeReader = transportInputPipeReader; - _transportOutputPipeWriter = transportOutputPipeWriter; + _transportOutputPipe = transportOutputPipe; Input = inputPipe; Output = outputPipe; _trace = trace; @@ -58,18 +57,24 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal while (true) { - var readResult = await Output.Reader.ReadAsync(); - var buffer = readResult.Buffer; + var result = await Output.Reader.ReadAsync(); + var buffer = result.Buffer; try { - if (buffer.IsEmpty && readResult.IsCompleted) + if (result.IsCancelled) { + // Forward the cancellation to the transport pipe + _transportOutputPipe.Reader.CancelPendingRead(); break; } if (buffer.IsEmpty) { + if (result.IsCompleted) + { + break; + } await stream.FlushAsync(); } else if (buffer.IsSingleSpan) @@ -99,7 +104,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal finally { Output.Reader.Complete(); - _transportOutputPipeWriter.Complete(error); + _transportOutputPipe.Writer.Complete(error); } } @@ -111,8 +116,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { if (stream == null) { - // If the stream is null then we're going to abort the connection - throw new ConnectionAbortedException(); + // REVIEW: Do we need an exception here? + return; } while (true) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs index 523af72020..ea3ba6e631 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs @@ -47,16 +47,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal ConnectionId = connectionId, FrameConnectionId = frameConnectionId, ServiceContext = _serviceContext, - PipeFactory = connectionInfo.PipeFactory, + ConnectionInformation = connectionInfo, ConnectionAdapters = _listenOptions.ConnectionAdapters, Frame = frame, Input = inputPipe, Output = outputPipe, }); - _serviceContext.Log.ConnectionStart(connectionId); - KestrelEventSource.Log.ConnectionStart(connection, connectionInfo); - // Since data cannot be added to the inputPipe by the transport until OnConnection returns, // Frame.ProcessRequestsAsync is guaranteed to unblock the transport thread before calling // application code. diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs index b25f7a983d..8ca4c36372 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal public IPipeWriter Input => _context.Input.Writer; public IPipeReader Output => _context.Output.Reader; - private PipeFactory PipeFactory => _context.PipeFactory; + private PipeFactory PipeFactory => _context.ConnectionInformation.PipeFactory; // Internal for testing internal PipeOptions AdaptedInputPipeOptions => new PipeOptions @@ -70,21 +70,24 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { try { + Log.ConnectionStart(ConnectionId); + KestrelEventSource.Log.ConnectionStart(this, _context.ConnectionInformation); + AdaptedPipeline adaptedPipeline = null; var adaptedPipelineTask = Task.CompletedTask; var input = _context.Input.Reader; - var output = _context.Output.Writer; + var output = _context.Output; if (_connectionAdapters.Count > 0) { - adaptedPipeline = new AdaptedPipeline(_context.Input.Reader, - _context.Output.Writer, + adaptedPipeline = new AdaptedPipeline(input, + output, PipeFactory.Create(AdaptedInputPipeOptions), PipeFactory.Create(AdaptedOutputPipeOptions), Log); input = adaptedPipeline.Input.Reader; - output = adaptedPipeline.Output.Writer; + output = adaptedPipeline.Output; } // Set these before the first await, this is to make sure that we don't yield control @@ -114,6 +117,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId); DisposeAdaptedConnections(); + + Log.ConnectionStop(ConnectionId); + KestrelEventSource.Log.ConnectionStop(this); } } @@ -122,8 +128,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal // Abort the connection (if not already aborted) _frame.Abort(ex); - Log.ConnectionStop(ConnectionId); - KestrelEventSource.Log.ConnectionStop(this); _socketClosedTcs.TrySetResult(null); } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs index d9a7c38148..24c30becbe 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { @@ -13,8 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal public string ConnectionId { get; set; } public long FrameConnectionId { get; set; } public ServiceContext ServiceContext { get; set; } - public PipeFactory PipeFactory { get; set; } public List ConnectionAdapters { get; set; } + public IConnectionInformation ConnectionInformation { get; set; } public Frame Frame { get; set; } public IPipe Input { get; set; } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index 0d024d6a7f..0f72b16648 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -6,7 +6,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.Extensions.Internal; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http @@ -21,10 +20,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // This locks access to to all of the below fields private readonly object _contextLock = new object(); - private bool _cancelled = false; private bool _completed = false; - private readonly IPipeWriter _pipe; + private readonly IPipe _pipe; // https://github.com/dotnet/corefxlab/issues/1334 // Pipelines don't support multiple awaiters on flush @@ -33,7 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private readonly object _flushLock = new object(); private Action _flushCompleted; - public OutputProducer(IPipeWriter pipe, string connectionId, IKestrelTrace log) + public OutputProducer(IPipe pipe, string connectionId, IKestrelTrace log) { _pipe = pipe; _connectionId = connectionId; @@ -50,13 +48,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { if (cancellationToken.IsCancellationRequested) { - _cancelled = true; return Task.FromCanceled(cancellationToken); } - else if (_cancelled) - { - return TaskCache.CompletedTask; - } return WriteAsync(buffer, cancellationToken, chunk); } @@ -80,7 +73,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http return; } - var buffer = _pipe.Alloc(1); + var buffer = _pipe.Writer.Alloc(1); callback(buffer, state); buffer.Commit(); } @@ -97,7 +90,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _log.ConnectionDisconnect(_connectionId); _completed = true; - _pipe.Complete(); + _pipe.Writer.Complete(); } } @@ -112,7 +105,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _log.ConnectionDisconnect(_connectionId); _completed = true; - _pipe.Complete(new ConnectionAbortedException()); + _pipe.Reader.CancelPendingRead(); } } @@ -130,7 +123,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http return TaskCache.CompletedTask; } - writableBuffer = _pipe.Alloc(1); + writableBuffer = _pipe.Writer.Alloc(1); var writer = new WritableBufferWriter(writableBuffer); if (buffer.Count > 0) { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs index 30b151a2c2..97cfbd79cd 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs @@ -37,50 +37,47 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal while (true) { + var result = await _pipe.ReadAsync(); + var buffer = result.Buffer; + var consumed = buffer.End; + try { - var result = await _pipe.ReadAsync(); - var buffer = result.Buffer; - var consumed = buffer.End; - - try + if (result.IsCancelled) { - if (!buffer.IsEmpty) + break; + } + + if (!buffer.IsEmpty) + { + var writeReq = pool.Allocate(); + + try { - var writeReq = pool.Allocate(); + var writeResult = await writeReq.WriteAsync(_socket, buffer); - try + LogWriteInfo(writeResult.Status, writeResult.Error); + + if (writeResult.Error != null) { - var writeResult = await writeReq.WriteAsync(_socket, buffer); - - LogWriteInfo(writeResult.Status, writeResult.Error); - - if (writeResult.Error != null) - { - consumed = buffer.Start; - throw writeResult.Error; - } - } - finally - { - // Make sure we return the writeReq to the pool - pool.Return(writeReq); + consumed = buffer.Start; + throw writeResult.Error; } } - - if (buffer.IsEmpty && result.IsCompleted) + finally { - break; + // Make sure we return the writeReq to the pool + pool.Return(writeReq); } } - finally + else if (result.IsCompleted) { - _pipe.Advance(consumed); + break; } } - catch (ConnectionAbortedException) + finally { - break; + _pipe.Advance(consumed); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs index 28685c3b60..ff0a0c2202 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs @@ -167,6 +167,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets var result = await _output.ReadAsync(); var buffer = result.Buffer; + if (result.IsCancelled) + { + break; + } + try { if (!buffer.IsEmpty) @@ -189,15 +194,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } } } - - if (result.IsCancelled) - { - // Send a FIN - _socket.Shutdown(SocketShutdown.Send); - break; - } - - if (buffer.IsEmpty && result.IsCompleted) + else if (result.IsCompleted) { break; } @@ -207,6 +204,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets _output.Advance(buffer.End); } } + + _socket.Shutdown(SocketShutdown.Send); } catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) { @@ -216,10 +215,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { error = null; } - catch (ConnectionAbortedException) - { - error = null; - } catch (IOException ex) { error = ex; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs index eb2c93f583..51ae37a73a 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs @@ -70,7 +70,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests TimeoutControl = Mock.Of() }; - _frame.Output = new OutputProducer(output.Writer, "", Mock.Of()); + _frame.Output = new OutputProducer(output, "", Mock.Of()); _frame.Reset(); } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs index 1e8ae196bd..1ec0f7656c 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs @@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests var pipe = _pipeFactory.Create(pipeOptions); var serviceContext = new TestServiceContext(); var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); - var socketOutput = new OutputProducer(pipe.Writer, "0", serviceContext.Log); + var socketOutput = new OutputProducer(pipe, "0", serviceContext.Log); return socketOutput; } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs index bef07b9063..74bbdc19c5 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs @@ -109,7 +109,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance Input = input.Reader, }; - frame.Output = new OutputProducer(output.Writer, "", null); + frame.Output = new OutputProducer(output, "", null); frame.Reset(); diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs index 8f28af9ef6..483705f3a2 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs @@ -131,7 +131,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance { Input = input.Reader, }; - frame.Output = new OutputProducer(output.Writer, "", null); + frame.Output = new OutputProducer(output, "", null); frame.Reset(); diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index 13b3706657..16f3361be8 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -392,6 +392,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.True(task3Canceled.IsCanceled); Assert.True(abortedSource.IsCancellationRequested); + + await _mockLibuv.OnPostTask; + + // Complete the 4th write + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } } }); } @@ -467,6 +475,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.True(task3Canceled.IsCanceled); Assert.True(abortedSource.IsCancellationRequested); + + await _mockLibuv.OnPostTask; + + // Complete the 4th write + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } } }); } @@ -544,6 +560,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // Third task is now canceled await Assert.ThrowsAsync(() => task3Canceled); Assert.True(task3Canceled.IsCanceled); + + await _mockLibuv.OnPostTask; + + // Complete the 4th write + while (completeQueue.TryDequeue(out var triggerNextCompleted)) + { + await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); + } } }); } @@ -586,6 +610,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var writeTask2 = socketOutput.WriteAsync(buffer); var writeTask3 = socketOutput.WriteAsync(buffer); + await _mockLibuv.OnPostTask; + // Drain the write queue while (completeQueue.TryDequeue(out var triggerNextCompleted)) { @@ -670,7 +696,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); var socket = new MockSocket(_mockLibuv, _libuvThread.Loop.ThreadId, transportContext.Log); - var outputProducer = new OutputProducer(pipe.Writer, "0", serviceContext.Log); + var outputProducer = new OutputProducer(pipe, "0", serviceContext.Log); var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, "0", transportContext.Log); frame.Output = outputProducer;