Dispatch connection execution (#12265)
- Dispatch connection initialization to the thread pool to avoid executing the first read on the IO thread (longer explanation in the bug).
This commit is contained in:
parent
8ce68d04c1
commit
7966f259ea
|
|
@ -51,7 +51,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
break;
|
||||
}
|
||||
|
||||
_ = Execute(new KestrelConnection(connection, _serviceContext.Log));
|
||||
var id = Interlocked.Increment(ref _lastConnectionId);
|
||||
var kestrelConnection = new KestrelConnection(id, _serviceContext, _connectionDelegate, connection, _serviceContext.Log);
|
||||
ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
|
@ -65,55 +67,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task Execute(KestrelConnection connection)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _lastConnectionId);
|
||||
var connectionContext = connection.TransportConnection;
|
||||
|
||||
try
|
||||
{
|
||||
_serviceContext.ConnectionManager.AddConnection(id, connection);
|
||||
|
||||
Log.ConnectionStart(connectionContext.ConnectionId);
|
||||
KestrelEventSource.Log.ConnectionStart(connectionContext);
|
||||
|
||||
using (BeginConnectionScope(connectionContext))
|
||||
{
|
||||
try
|
||||
{
|
||||
await _connectionDelegate(connectionContext);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await connection.FireOnCompletedAsync();
|
||||
|
||||
Log.ConnectionStop(connectionContext.ConnectionId);
|
||||
KestrelEventSource.Log.ConnectionStop(connectionContext);
|
||||
|
||||
// Dispose the transport connection, this needs to happen before removing it from the
|
||||
// connection manager so that we only signal completion of this connection after the transport
|
||||
// is properly torn down.
|
||||
await connection.TransportConnection.DisposeAsync();
|
||||
|
||||
_serviceContext.ConnectionManager.RemoveConnection(id);
|
||||
}
|
||||
}
|
||||
|
||||
private IDisposable BeginConnectionScope(ConnectionContext connectionContext)
|
||||
{
|
||||
if (Log.IsEnabled(LogLevel.Critical))
|
||||
{
|
||||
return Log.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId));
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging;
|
|||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
||||
{
|
||||
internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature
|
||||
internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature, IThreadPoolWorkItem
|
||||
{
|
||||
private List<(Action<object> handler, object state)> _heartbeatHandlers;
|
||||
private readonly object _heartbeatLock = new object();
|
||||
|
|
@ -21,9 +21,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
|
||||
private readonly CancellationTokenSource _connectionClosingCts = new CancellationTokenSource();
|
||||
private readonly TaskCompletionSource<object> _completionTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly long _id;
|
||||
private readonly ServiceContext _serviceContext;
|
||||
private readonly ConnectionDelegate _connectionDelegate;
|
||||
|
||||
public KestrelConnection(ConnectionContext connectionContext, ILogger logger)
|
||||
public KestrelConnection(long id,
|
||||
ServiceContext serviceContext,
|
||||
ConnectionDelegate connectionDelegate,
|
||||
ConnectionContext connectionContext,
|
||||
IKestrelTrace logger)
|
||||
{
|
||||
_id = id;
|
||||
_serviceContext = serviceContext;
|
||||
_connectionDelegate = connectionDelegate;
|
||||
Logger = logger;
|
||||
TransportConnection = connectionContext;
|
||||
|
||||
|
|
@ -33,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
ConnectionClosedRequested = _connectionClosingCts.Token;
|
||||
}
|
||||
|
||||
private ILogger Logger { get; }
|
||||
private IKestrelTrace Logger { get; }
|
||||
|
||||
public ConnectionContext TransportConnection { get; set; }
|
||||
public CancellationToken ConnectionClosedRequested { get; set; }
|
||||
|
|
@ -164,5 +174,59 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
|
||||
_connectionClosingCts.Dispose();
|
||||
}
|
||||
|
||||
void IThreadPoolWorkItem.Execute()
|
||||
{
|
||||
_ = ExecuteAsync();
|
||||
}
|
||||
|
||||
internal async Task ExecuteAsync()
|
||||
{
|
||||
var connectionContext = TransportConnection;
|
||||
|
||||
try
|
||||
{
|
||||
_serviceContext.ConnectionManager.AddConnection(_id, this);
|
||||
|
||||
Logger.ConnectionStart(connectionContext.ConnectionId);
|
||||
KestrelEventSource.Log.ConnectionStart(connectionContext);
|
||||
|
||||
using (BeginConnectionScope(connectionContext))
|
||||
{
|
||||
try
|
||||
{
|
||||
await _connectionDelegate(connectionContext);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await FireOnCompletedAsync();
|
||||
|
||||
Logger.ConnectionStop(connectionContext.ConnectionId);
|
||||
KestrelEventSource.Log.ConnectionStop(connectionContext);
|
||||
|
||||
// Dispose the transport connection, this needs to happen before removing it from the
|
||||
// connection manager so that we only signal completion of this connection after the transport
|
||||
// is properly torn down.
|
||||
await TransportConnection.DisposeAsync();
|
||||
|
||||
_serviceContext.ConnectionManager.RemoveConnection(_id);
|
||||
}
|
||||
}
|
||||
|
||||
private IDisposable BeginConnectionScope(ConnectionContext connectionContext)
|
||||
{
|
||||
if (Logger.IsEnabled(LogLevel.Critical))
|
||||
{
|
||||
return Logger.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId));
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,17 +21,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
public class ConnectionDispatcherTests
|
||||
{
|
||||
[Fact]
|
||||
public void OnConnectionCreatesLogScopeWithConnectionId()
|
||||
public async Task OnConnectionCreatesLogScopeWithConnectionId()
|
||||
{
|
||||
var serviceContext = new TestServiceContext();
|
||||
// This needs to run inline
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
var dispatcher = new ConnectionDispatcher(serviceContext, _ => tcs.Task);
|
||||
|
||||
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
|
||||
connection.ConnectionClosed = new CancellationToken(canceled: true);
|
||||
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => tcs.Task, connection, serviceContext.Log);
|
||||
|
||||
_ = dispatcher.Execute(new KestrelConnection(connection, Mock.Of<ILogger>()));
|
||||
var task = kestrelConnection.ExecuteAsync();
|
||||
|
||||
// The scope should be created
|
||||
var scopeObjects = ((TestKestrelTrace)serviceContext.Log)
|
||||
|
|
@ -47,6 +47,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
|
||||
tcs.TrySetResult(null);
|
||||
|
||||
await task;
|
||||
|
||||
// Verify the scope was disposed after request processing completed
|
||||
Assert.True(((TestKestrelTrace)serviceContext.Log).Logger.Scopes.IsEmpty);
|
||||
}
|
||||
|
|
@ -73,11 +75,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
public async Task OnConnectionFiresOnCompleted()
|
||||
{
|
||||
var serviceContext = new TestServiceContext();
|
||||
var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask);
|
||||
|
||||
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
|
||||
connection.ConnectionClosed = new CancellationToken(canceled: true);
|
||||
var kestrelConnection = new KestrelConnection(connection, Mock.Of<ILogger>());
|
||||
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log);
|
||||
var completeFeature = kestrelConnection.TransportConnection.Features.Get<IConnectionCompleteFeature>();
|
||||
|
||||
Assert.NotNull(completeFeature);
|
||||
|
|
@ -85,7 +86,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
object callbackState = null;
|
||||
completeFeature.OnCompleted(state => { callbackState = state; return Task.CompletedTask; }, stateObject);
|
||||
|
||||
await dispatcher.Execute(kestrelConnection);
|
||||
await kestrelConnection.ExecuteAsync();
|
||||
|
||||
Assert.Equal(stateObject, callbackState);
|
||||
}
|
||||
|
|
@ -94,12 +95,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
public async Task OnConnectionOnCompletedExceptionCaught()
|
||||
{
|
||||
var serviceContext = new TestServiceContext();
|
||||
var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask);
|
||||
|
||||
var logger = ((TestKestrelTrace)serviceContext.Log).Logger;
|
||||
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
|
||||
connection.ConnectionClosed = new CancellationToken(canceled: true);
|
||||
var mockLogger = new Mock<ILogger>();
|
||||
var kestrelConnection = new KestrelConnection(connection, mockLogger.Object);
|
||||
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log);
|
||||
var completeFeature = kestrelConnection.TransportConnection.Features.Get<IConnectionCompleteFeature>();
|
||||
|
||||
Assert.NotNull(completeFeature);
|
||||
|
|
@ -107,12 +106,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
object callbackState = null;
|
||||
completeFeature.OnCompleted(state => { callbackState = state; throw new InvalidTimeZoneException(); }, stateObject);
|
||||
|
||||
await dispatcher.Execute(kestrelConnection);
|
||||
await kestrelConnection.ExecuteAsync();
|
||||
|
||||
Assert.Equal(stateObject, callbackState);
|
||||
var log = mockLogger.Invocations.First();
|
||||
Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", log.Arguments[2].ToString());
|
||||
Assert.IsType<InvalidTimeZoneException>(log.Arguments[3]);
|
||||
var errors = logger.Messages.Where(e => e.LogLevel >= LogLevel.Error).ToArray();
|
||||
Assert.Single(errors);
|
||||
Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", errors[0].Message);
|
||||
}
|
||||
|
||||
private class ThrowingListener : IConnectionListener
|
||||
|
|
|
|||
|
|
@ -3,8 +3,10 @@
|
|||
|
||||
using System;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Testing;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
|
@ -39,9 +41,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
ConnectionManager httpConnectionManager,
|
||||
Mock<IKestrelTrace> trace)
|
||||
{
|
||||
var serviceContext = new TestServiceContext();
|
||||
var mock = new Mock<DefaultConnectionContext>() { CallBase = true };
|
||||
mock.Setup(m => m.ConnectionId).Returns(connectionId);
|
||||
var httpConnection = new KestrelConnection(mock.Object, Mock.Of<ILogger>());
|
||||
var httpConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, mock.Object, Mock.Of<IKestrelTrace>());
|
||||
|
||||
httpConnectionManager.AddConnection(0, httpConnection);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue