diff --git a/src/Kestrel.Core/Internal/ConnectionDispatcher.cs b/src/Kestrel.Core/Internal/ConnectionDispatcher.cs index f2d512b955..09c9c08df9 100644 --- a/src/Kestrel.Core/Internal/ConnectionDispatcher.cs +++ b/src/Kestrel.Core/Internal/ConnectionDispatcher.cs @@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal private IKestrelTrace Log => _serviceContext.Log; - public void OnConnection(TransportConnection connection) + public Task OnConnection(TransportConnection connection) { // REVIEW: Unfortunately, we still need to use the service context to create the pipes since the settings // for the scheduler and limits are specified here @@ -44,9 +44,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal // This *must* be set before returning from OnConnection connection.Application = pair.Application; - // REVIEW: This task should be tracked by the server for graceful shutdown - // Today it's handled specifically for http but not for aribitrary middleware - _ = Execute(connection); + return Execute(connection); } private async Task Execute(ConnectionContext connectionContext) diff --git a/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs b/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs index 907115dd9a..d573c74e23 100644 --- a/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs +++ b/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs @@ -55,6 +55,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal }; var connectionFeature = connectionContext.Features.Get(); + var lifetimeFeature = connectionContext.Features.Get(); if (connectionFeature != null) { @@ -70,46 +71,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } var connection = new HttpConnection(httpConnectionContext); - var inputCompletionState = new PipeCompletionState(connection); - var outputCompletionState = new PipeCompletionState(connection); var processingTask = connection.StartRequestProcessing(_application); connectionContext.Transport.Input.OnWriterCompleted( - (error, state) => ((PipeCompletionState)state).CompletionCallback(error), - inputCompletionState); + (error, state) => ((HttpConnection)state).Abort(error), + connection); connectionContext.Transport.Output.OnReaderCompleted( - (error, state) => ((PipeCompletionState)state).CompletionCallback(error), - outputCompletionState); + (error, state) => ((HttpConnection)state).Abort(error), + connection); - await inputCompletionState.CompletionTask; - await outputCompletionState.CompletionTask; + await AsTask(lifetimeFeature.ConnectionClosed); connection.OnConnectionClosed(); await processingTask; } - private class PipeCompletionState + private Task AsTask(CancellationToken token) { - private readonly HttpConnection _connection; - private readonly TaskCompletionSource _completionTcs = new TaskCompletionSource(); - - public PipeCompletionState(HttpConnection connection) - { - _connection = connection; - CompletionTask = _completionTcs.Task; - } - - - public Task CompletionTask { get; } - - public void CompletionCallback(Exception error) - { - _connection.Abort(error); - _completionTcs.SetResult(null); - } + var tcs = new TaskCompletionSource(); + token.Register(() => tcs.SetResult(null)); + return tcs.Task; } } } diff --git a/src/Kestrel.Transport.Abstractions/Internal/IConnectionDispatcher.cs b/src/Kestrel.Transport.Abstractions/Internal/IConnectionDispatcher.cs index 05b7d84ea2..813c541d1a 100644 --- a/src/Kestrel.Transport.Abstractions/Internal/IConnectionDispatcher.cs +++ b/src/Kestrel.Transport.Abstractions/Internal/IConnectionDispatcher.cs @@ -1,12 +1,12 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using Microsoft.AspNetCore.Http.Features; +using System.Threading.Tasks; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal { public interface IConnectionDispatcher { - void OnConnection(TransportConnection connection); + Task OnConnection(TransportConnection connection); } } diff --git a/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 4ba4e94152..db18aed00b 100644 --- a/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -14,7 +14,7 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal { - public partial class LibuvConnection : TransportConnection + public partial class LibuvConnection : TransportConnection, IDisposable { private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2; @@ -111,6 +111,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal Thread.Post(s => s.Dispose(), _socket); } + // Only called after connection middleware is complete which means the ConnectionClosed token has fired. + public void Dispose() + { + _connectionClosedTokenSource.Dispose(); + } + // Called on Libuv thread private static LibuvFunctions.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state) { @@ -223,7 +229,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal try { _connectionClosedTokenSource.Cancel(); - _connectionClosedTokenSource.Dispose(); } catch (Exception ex) { diff --git a/src/Kestrel.Transport.Libuv/Internal/Listener.cs b/src/Kestrel.Transport.Libuv/Internal/Listener.cs index 230be06dea..c6e9e2b493 100644 --- a/src/Kestrel.Transport.Libuv/Internal/Listener.cs +++ b/src/Kestrel.Transport.Libuv/Internal/Listener.cs @@ -181,11 +181,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal protected virtual void DispatchConnection(UvStreamHandle socket) { - var connection = new LibuvConnection(socket, Log, Thread); - - TransportContext.ConnectionDispatcher.OnConnection(connection); - - _ = connection.Start(); + // REVIEW: This task should be tracked by the server for graceful shutdown + // Today it's handled specifically for http but not for aribitrary middleware + _ = HandleConnectionAsync(socket); } public virtual async Task DisposeAsync() diff --git a/src/Kestrel.Transport.Libuv/Internal/ListenerContext.cs b/src/Kestrel.Transport.Libuv/Internal/ListenerContext.cs index 37ea4eed36..b8d5d7087e 100644 --- a/src/Kestrel.Transport.Libuv/Internal/ListenerContext.cs +++ b/src/Kestrel.Transport.Libuv/Internal/ListenerContext.cs @@ -2,8 +2,10 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; +using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal { @@ -38,6 +40,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } + protected async Task HandleConnectionAsync(UvStreamHandle socket) + { + try + { + var connection = new LibuvConnection(socket, TransportContext.Log, Thread); + var middlewareTask = TransportContext.ConnectionDispatcher.OnConnection(connection); + var transportTask = connection.Start(); + + await transportTask; + await middlewareTask; + + connection.Dispose(); + } + catch (Exception ex) + { + TransportContext.Log.LogCritical(ex, $"Unexpected exception in {nameof(ListenerContext)}.{nameof(HandleConnectionAsync)}."); + } + } + private UvTcpHandle AcceptTcp() { var socket = new UvTcpHandle(TransportContext.Log); diff --git a/src/Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs b/src/Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs index 63348046ac..7c265ad228 100644 --- a/src/Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs +++ b/src/Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs @@ -159,19 +159,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal return; } - try - { - var connection = new LibuvConnection(acceptSocket, Log, Thread); - - TransportContext.ConnectionDispatcher.OnConnection(connection); - - _ = connection.Start(); - } - catch (UvException ex) - { - Log.LogError(0, ex, "ListenerSecondary.OnConnection"); - acceptSocket.Dispose(); - } + // REVIEW: This task should be tracked by the server for graceful shutdown + // Today it's handled specifically for http but not for aribitrary middleware + _ = HandleConnectionAsync(acceptSocket); } private void FreeBuffer() diff --git a/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs b/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs index 06c73f685b..b59390ed71 100644 --- a/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs +++ b/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs @@ -17,7 +17,7 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { - internal sealed class SocketConnection : TransportConnection + internal sealed class SocketConnection : TransportConnection, IDisposable { private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2; @@ -91,6 +91,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal Shutdown(); } + // Only called after connection middleware is complete which means the ConnectionClosed token has fired. + public void Dispose() + { + _connectionClosedTokenSource.Dispose(); + } + private async Task DoReceive() { Exception error = null; @@ -281,7 +287,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal try { _connectionClosedTokenSource.Cancel(); - _connectionClosedTokenSource.Dispose(); } catch (Exception ex) { diff --git a/src/Kestrel.Transport.Sockets/SocketTransport.cs b/src/Kestrel.Transport.Sockets/SocketTransport.cs index 824f05629f..f3745e95a6 100644 --- a/src/Kestrel.Transport.Sockets/SocketTransport.cs +++ b/src/Kestrel.Transport.Sockets/SocketTransport.cs @@ -158,9 +158,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[schedulerIndex], _trace); - _dispatcher.OnConnection(connection); - - _ = connection.StartAsync(); + // REVIEW: This task should be tracked by the server for graceful shutdown + // Today it's handled specifically for http but not for aribitrary middleware + _ = HandleConnectionAsync(connection); } catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) { @@ -182,7 +182,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } else { - _trace.LogCritical(ex, $"Unexpected exeption in {nameof(SocketTransport)}.{nameof(RunAcceptLoopAsync)}."); + _trace.LogCritical(ex, $"Unexpected exception in {nameof(SocketTransport)}.{nameof(RunAcceptLoopAsync)}."); _listenException = ex; // Request shutdown so we can rethrow this exception @@ -192,6 +192,24 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } } + private async Task HandleConnectionAsync(SocketConnection connection) + { + try + { + var middlewareTask = _dispatcher.OnConnection(connection); + var transportTask = connection.StartAsync(); + + await transportTask; + await middlewareTask; + + connection.Dispose(); + } + catch (Exception ex) + { + _trace.LogCritical(ex, $"Unexpected exception in {nameof(SocketTransport)}.{nameof(HandleConnectionAsync)}."); + } + } + [DllImport("libc", SetLastError = true)] private static extern int setsockopt(int socket, int level, int option_name, IntPtr option_value, uint option_len); diff --git a/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionDispatcher.cs b/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionDispatcher.cs index e4f17bbda4..01e0c049d0 100644 --- a/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionDispatcher.cs +++ b/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionDispatcher.cs @@ -4,9 +4,7 @@ using System; using System.Buffers; using System.IO.Pipelines; -using Microsoft.AspNetCore.Http.Features; -using Microsoft.AspNetCore.Connections; -using Microsoft.AspNetCore.Connections.Features; +using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers @@ -16,13 +14,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers public Func, PipeOptions> InputOptions { get; set; } = pool => new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); public Func, PipeOptions> OutputOptions { get; set; } = pool => new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); - public void OnConnection(TransportConnection connection) + public Task OnConnection(TransportConnection connection) { Input = new Pipe(InputOptions(connection.MemoryPool)); Output = new Pipe(OutputOptions(connection.MemoryPool)); connection.Transport = new DuplexPipe(Input.Reader, Output.Writer); connection.Application = new DuplexPipe(Output.Reader, Input.Writer); + + return Task.CompletedTask; } public Pipe Input { get; private set; }