From 7966f259ea87f80ae7dec07956aaa416309c3ba8 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Thu, 18 Jul 2019 06:36:08 -0700 Subject: [PATCH] Dispatch connection execution (#12265) - Dispatch connection initialization to the thread pool to avoid executing the first read on the IO thread (longer explanation in the bug). --- .../Core/src/Internal/ConnectionDispatcher.cs | 54 +------------- .../Infrastructure/KestrelConnection.cs | 70 ++++++++++++++++++- .../Core/test/ConnectionDispatcherTests.cs | 27 ++++--- .../Core/test/HttpConnectionManagerTests.cs | 5 +- 4 files changed, 87 insertions(+), 69 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs b/src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs index 8670469ea5..603873646a 100644 --- a/src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs +++ b/src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs @@ -51,7 +51,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal break; } - _ = Execute(new KestrelConnection(connection, _serviceContext.Log)); + var id = Interlocked.Increment(ref _lastConnectionId); + var kestrelConnection = new KestrelConnection(id, _serviceContext, _connectionDelegate, connection, _serviceContext.Log); + ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false); } } catch (Exception ex) @@ -65,55 +67,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } } } - - internal async Task Execute(KestrelConnection connection) - { - var id = Interlocked.Increment(ref _lastConnectionId); - var connectionContext = connection.TransportConnection; - - try - { - _serviceContext.ConnectionManager.AddConnection(id, connection); - - Log.ConnectionStart(connectionContext.ConnectionId); - KestrelEventSource.Log.ConnectionStart(connectionContext); - - using (BeginConnectionScope(connectionContext)) - { - try - { - await _connectionDelegate(connectionContext); - } - catch (Exception ex) - { - Log.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId); - } - } - } - finally - { - await connection.FireOnCompletedAsync(); - - Log.ConnectionStop(connectionContext.ConnectionId); - KestrelEventSource.Log.ConnectionStop(connectionContext); - - // Dispose the transport connection, this needs to happen before removing it from the - // connection manager so that we only signal completion of this connection after the transport - // is properly torn down. - await connection.TransportConnection.DisposeAsync(); - - _serviceContext.ConnectionManager.RemoveConnection(id); - } - } - - private IDisposable BeginConnectionScope(ConnectionContext connectionContext) - { - if (Log.IsEnabled(LogLevel.Critical)) - { - return Log.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId)); - } - - return null; - } } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelConnection.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelConnection.cs index 9cbf8f12a1..c885f80e0a 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelConnection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelConnection.cs @@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure { - internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature + internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature, IThreadPoolWorkItem { private List<(Action handler, object state)> _heartbeatHandlers; private readonly object _heartbeatLock = new object(); @@ -21,9 +21,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure private readonly CancellationTokenSource _connectionClosingCts = new CancellationTokenSource(); private readonly TaskCompletionSource _completionTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly long _id; + private readonly ServiceContext _serviceContext; + private readonly ConnectionDelegate _connectionDelegate; - public KestrelConnection(ConnectionContext connectionContext, ILogger logger) + public KestrelConnection(long id, + ServiceContext serviceContext, + ConnectionDelegate connectionDelegate, + ConnectionContext connectionContext, + IKestrelTrace logger) { + _id = id; + _serviceContext = serviceContext; + _connectionDelegate = connectionDelegate; Logger = logger; TransportConnection = connectionContext; @@ -33,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure ConnectionClosedRequested = _connectionClosingCts.Token; } - private ILogger Logger { get; } + private IKestrelTrace Logger { get; } public ConnectionContext TransportConnection { get; set; } public CancellationToken ConnectionClosedRequested { get; set; } @@ -164,5 +174,59 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure _connectionClosingCts.Dispose(); } + + void IThreadPoolWorkItem.Execute() + { + _ = ExecuteAsync(); + } + + internal async Task ExecuteAsync() + { + var connectionContext = TransportConnection; + + try + { + _serviceContext.ConnectionManager.AddConnection(_id, this); + + Logger.ConnectionStart(connectionContext.ConnectionId); + KestrelEventSource.Log.ConnectionStart(connectionContext); + + using (BeginConnectionScope(connectionContext)) + { + try + { + await _connectionDelegate(connectionContext); + } + catch (Exception ex) + { + Logger.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId); + } + } + } + finally + { + await FireOnCompletedAsync(); + + Logger.ConnectionStop(connectionContext.ConnectionId); + KestrelEventSource.Log.ConnectionStop(connectionContext); + + // Dispose the transport connection, this needs to happen before removing it from the + // connection manager so that we only signal completion of this connection after the transport + // is properly torn down. + await TransportConnection.DisposeAsync(); + + _serviceContext.ConnectionManager.RemoveConnection(_id); + } + } + + private IDisposable BeginConnectionScope(ConnectionContext connectionContext) + { + if (Logger.IsEnabled(LogLevel.Critical)) + { + return Logger.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId)); + } + + return null; + } } } diff --git a/src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs b/src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs index cf2b23de77..de6f33c28d 100644 --- a/src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs +++ b/src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs @@ -21,17 +21,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests public class ConnectionDispatcherTests { [Fact] - public void OnConnectionCreatesLogScopeWithConnectionId() + public async Task OnConnectionCreatesLogScopeWithConnectionId() { var serviceContext = new TestServiceContext(); // This needs to run inline var tcs = new TaskCompletionSource(); - var dispatcher = new ConnectionDispatcher(serviceContext, _ => tcs.Task); var connection = new Mock { CallBase = true }.Object; connection.ConnectionClosed = new CancellationToken(canceled: true); + var kestrelConnection = new KestrelConnection(0, serviceContext, _ => tcs.Task, connection, serviceContext.Log); - _ = dispatcher.Execute(new KestrelConnection(connection, Mock.Of())); + var task = kestrelConnection.ExecuteAsync(); // The scope should be created var scopeObjects = ((TestKestrelTrace)serviceContext.Log) @@ -47,6 +47,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests tcs.TrySetResult(null); + await task; + // Verify the scope was disposed after request processing completed Assert.True(((TestKestrelTrace)serviceContext.Log).Logger.Scopes.IsEmpty); } @@ -73,11 +75,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests public async Task OnConnectionFiresOnCompleted() { var serviceContext = new TestServiceContext(); - var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask); var connection = new Mock { CallBase = true }.Object; connection.ConnectionClosed = new CancellationToken(canceled: true); - var kestrelConnection = new KestrelConnection(connection, Mock.Of()); + var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log); var completeFeature = kestrelConnection.TransportConnection.Features.Get(); Assert.NotNull(completeFeature); @@ -85,7 +86,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests object callbackState = null; completeFeature.OnCompleted(state => { callbackState = state; return Task.CompletedTask; }, stateObject); - await dispatcher.Execute(kestrelConnection); + await kestrelConnection.ExecuteAsync(); Assert.Equal(stateObject, callbackState); } @@ -94,12 +95,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests public async Task OnConnectionOnCompletedExceptionCaught() { var serviceContext = new TestServiceContext(); - var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask); - + var logger = ((TestKestrelTrace)serviceContext.Log).Logger; var connection = new Mock { CallBase = true }.Object; connection.ConnectionClosed = new CancellationToken(canceled: true); - var mockLogger = new Mock(); - var kestrelConnection = new KestrelConnection(connection, mockLogger.Object); + var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log); var completeFeature = kestrelConnection.TransportConnection.Features.Get(); Assert.NotNull(completeFeature); @@ -107,12 +106,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests object callbackState = null; completeFeature.OnCompleted(state => { callbackState = state; throw new InvalidTimeZoneException(); }, stateObject); - await dispatcher.Execute(kestrelConnection); + await kestrelConnection.ExecuteAsync(); Assert.Equal(stateObject, callbackState); - var log = mockLogger.Invocations.First(); - Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", log.Arguments[2].ToString()); - Assert.IsType(log.Arguments[3]); + var errors = logger.Messages.Where(e => e.LogLevel >= LogLevel.Error).ToArray(); + Assert.Single(errors); + Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", errors[0].Message); } private class ThrowingListener : IConnectionListener diff --git a/src/Servers/Kestrel/Core/test/HttpConnectionManagerTests.cs b/src/Servers/Kestrel/Core/test/HttpConnectionManagerTests.cs index ab44ae3bc3..815fe5019a 100644 --- a/src/Servers/Kestrel/Core/test/HttpConnectionManagerTests.cs +++ b/src/Servers/Kestrel/Core/test/HttpConnectionManagerTests.cs @@ -3,8 +3,10 @@ using System; using System.Runtime.CompilerServices; +using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.Logging; using Moq; using Xunit; @@ -39,9 +41,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests ConnectionManager httpConnectionManager, Mock trace) { + var serviceContext = new TestServiceContext(); var mock = new Mock() { CallBase = true }; mock.Setup(m => m.ConnectionId).Returns(connectionId); - var httpConnection = new KestrelConnection(mock.Object, Mock.Of()); + var httpConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, mock.Object, Mock.Of()); httpConnectionManager.AddConnection(0, httpConnection);