Wait off the ConnectionClosed token to stop tracking ConnectionCallback (#2574)

- The prior strategy of waiting for the pipe completed callbacks doesn't work
  because blocks are returned to the memory pool after the callbacks are fired.
This commit is contained in:
Stephen Halter 2018-05-15 12:38:35 -07:00 committed by GitHub
parent c683316253
commit f8f6f39f55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 81 additions and 62 deletions

View File

@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private IKestrelTrace Log => _serviceContext.Log;
public void OnConnection(TransportConnection connection)
public Task OnConnection(TransportConnection connection)
{
// REVIEW: Unfortunately, we still need to use the service context to create the pipes since the settings
// for the scheduler and limits are specified here
@ -44,9 +44,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
// This *must* be set before returning from OnConnection
connection.Application = pair.Application;
// REVIEW: This task should be tracked by the server for graceful shutdown
// Today it's handled specifically for http but not for aribitrary middleware
_ = Execute(connection);
return Execute(connection);
}
private async Task Execute(ConnectionContext connectionContext)

View File

@ -55,6 +55,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
};
var connectionFeature = connectionContext.Features.Get<IHttpConnectionFeature>();
var lifetimeFeature = connectionContext.Features.Get<IConnectionLifetimeFeature>();
if (connectionFeature != null)
{
@ -70,46 +71,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
var connection = new HttpConnection(httpConnectionContext);
var inputCompletionState = new PipeCompletionState(connection);
var outputCompletionState = new PipeCompletionState(connection);
var processingTask = connection.StartRequestProcessing(_application);
connectionContext.Transport.Input.OnWriterCompleted(
(error, state) => ((PipeCompletionState)state).CompletionCallback(error),
inputCompletionState);
(error, state) => ((HttpConnection)state).Abort(error),
connection);
connectionContext.Transport.Output.OnReaderCompleted(
(error, state) => ((PipeCompletionState)state).CompletionCallback(error),
outputCompletionState);
(error, state) => ((HttpConnection)state).Abort(error),
connection);
await inputCompletionState.CompletionTask;
await outputCompletionState.CompletionTask;
await AsTask(lifetimeFeature.ConnectionClosed);
connection.OnConnectionClosed();
await processingTask;
}
private class PipeCompletionState
private Task AsTask(CancellationToken token)
{
private readonly HttpConnection _connection;
private readonly TaskCompletionSource<object> _completionTcs = new TaskCompletionSource<object>();
public PipeCompletionState(HttpConnection connection)
{
_connection = connection;
CompletionTask = _completionTcs.Task;
}
public Task CompletionTask { get; }
public void CompletionCallback(Exception error)
{
_connection.Abort(error);
_completionTcs.SetResult(null);
}
var tcs = new TaskCompletionSource<object>();
token.Register(() => tcs.SetResult(null));
return tcs.Task;
}
}
}

View File

@ -1,12 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.Http.Features;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
{
public interface IConnectionDispatcher
{
void OnConnection(TransportConnection connection);
Task OnConnection(TransportConnection connection);
}
}

View File

@ -14,7 +14,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
public partial class LibuvConnection : TransportConnection
public partial class LibuvConnection : TransportConnection, IDisposable
{
private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2;
@ -111,6 +111,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
Thread.Post(s => s.Dispose(), _socket);
}
// Only called after connection middleware is complete which means the ConnectionClosed token has fired.
public void Dispose()
{
_connectionClosedTokenSource.Dispose();
}
// Called on Libuv thread
private static LibuvFunctions.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state)
{
@ -223,7 +229,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
try
{
_connectionClosedTokenSource.Cancel();
_connectionClosedTokenSource.Dispose();
}
catch (Exception ex)
{

View File

@ -181,11 +181,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
protected virtual void DispatchConnection(UvStreamHandle socket)
{
var connection = new LibuvConnection(socket, Log, Thread);
TransportContext.ConnectionDispatcher.OnConnection(connection);
_ = connection.Start();
// REVIEW: This task should be tracked by the server for graceful shutdown
// Today it's handled specifically for http but not for aribitrary middleware
_ = HandleConnectionAsync(socket);
}
public virtual async Task DisposeAsync()

View File

@ -2,8 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
@ -38,6 +40,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
}
protected async Task HandleConnectionAsync(UvStreamHandle socket)
{
try
{
var connection = new LibuvConnection(socket, TransportContext.Log, Thread);
var middlewareTask = TransportContext.ConnectionDispatcher.OnConnection(connection);
var transportTask = connection.Start();
await transportTask;
await middlewareTask;
connection.Dispose();
}
catch (Exception ex)
{
TransportContext.Log.LogCritical(ex, $"Unexpected exception in {nameof(ListenerContext)}.{nameof(HandleConnectionAsync)}.");
}
}
private UvTcpHandle AcceptTcp()
{
var socket = new UvTcpHandle(TransportContext.Log);

View File

@ -159,19 +159,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
return;
}
try
{
var connection = new LibuvConnection(acceptSocket, Log, Thread);
TransportContext.ConnectionDispatcher.OnConnection(connection);
_ = connection.Start();
}
catch (UvException ex)
{
Log.LogError(0, ex, "ListenerSecondary.OnConnection");
acceptSocket.Dispose();
}
// REVIEW: This task should be tracked by the server for graceful shutdown
// Today it's handled specifically for http but not for aribitrary middleware
_ = HandleConnectionAsync(acceptSocket);
}
private void FreeBuffer()

View File

@ -17,7 +17,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketConnection : TransportConnection
internal sealed class SocketConnection : TransportConnection, IDisposable
{
private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2;
@ -91,6 +91,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
Shutdown();
}
// Only called after connection middleware is complete which means the ConnectionClosed token has fired.
public void Dispose()
{
_connectionClosedTokenSource.Dispose();
}
private async Task DoReceive()
{
Exception error = null;
@ -281,7 +287,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
try
{
_connectionClosedTokenSource.Cancel();
_connectionClosedTokenSource.Dispose();
}
catch (Exception ex)
{

View File

@ -158,9 +158,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[schedulerIndex], _trace);
_dispatcher.OnConnection(connection);
_ = connection.StartAsync();
// REVIEW: This task should be tracked by the server for graceful shutdown
// Today it's handled specifically for http but not for aribitrary middleware
_ = HandleConnectionAsync(connection);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
@ -182,7 +182,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
}
else
{
_trace.LogCritical(ex, $"Unexpected exeption in {nameof(SocketTransport)}.{nameof(RunAcceptLoopAsync)}.");
_trace.LogCritical(ex, $"Unexpected exception in {nameof(SocketTransport)}.{nameof(RunAcceptLoopAsync)}.");
_listenException = ex;
// Request shutdown so we can rethrow this exception
@ -192,6 +192,24 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
}
}
private async Task HandleConnectionAsync(SocketConnection connection)
{
try
{
var middlewareTask = _dispatcher.OnConnection(connection);
var transportTask = connection.StartAsync();
await transportTask;
await middlewareTask;
connection.Dispose();
}
catch (Exception ex)
{
_trace.LogCritical(ex, $"Unexpected exception in {nameof(SocketTransport)}.{nameof(HandleConnectionAsync)}.");
}
}
[DllImport("libc", SetLastError = true)]
private static extern int setsockopt(int socket, int level, int option_name, IntPtr option_value, uint option_len);

View File

@ -4,9 +4,7 @@
using System;
using System.Buffers;
using System.IO.Pipelines;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
@ -16,13 +14,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
public Func<MemoryPool<byte>, PipeOptions> InputOptions { get; set; } = pool => new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
public Func<MemoryPool<byte>, PipeOptions> OutputOptions { get; set; } = pool => new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
public void OnConnection(TransportConnection connection)
public Task OnConnection(TransportConnection connection)
{
Input = new Pipe(InputOptions(connection.MemoryPool));
Output = new Pipe(OutputOptions(connection.MemoryPool));
connection.Transport = new DuplexPipe(Input.Reader, Output.Writer);
connection.Application = new DuplexPipe(Output.Reader, Input.Writer);
return Task.CompletedTask;
}
public Pipe Input { get; private set; }