From 7aa7b3e0a9fbb0f16a4a9f8ae19fc29155c788d7 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 12 Apr 2017 11:56:02 -0700 Subject: [PATCH] Track connections until request processing completes - Paves the way to allow request processing to continue during server shutdown even after a client disconnects. --- .../Internal/FrameConnection.cs | 10 +- .../Internal/Http/Frame.FeatureCollection.cs | 2 +- .../Internal/Http/Frame.cs | 2 +- .../Internal/Http/OutputProducer.cs | 23 ++- .../Internal/LibuvConnection.cs | 53 +++--- .../Internal/LibuvConnectionManager.cs | 2 +- .../Internal/LibuvOutputConsumer.cs | 31 +--- .../LibuvOutputConsumerTests.cs | 158 +++++++++++------- .../TestHelpers/MockConnection.cs | 42 ----- 9 files changed, 163 insertions(+), 160 deletions(-) delete mode 100644 test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs index d847c2aca6..50b2aebae7 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs @@ -7,7 +7,6 @@ using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter; using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; @@ -88,11 +87,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } } - public void OnConnectionClosed() + public async void OnConnectionClosed() { - _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId); Log.ConnectionStop(ConnectionId); KestrelEventSource.Log.ConnectionStop(this); + + // The connection is already in the "aborted" state by this point, but we want to track it + // until RequestProcessingAsync completes for graceful shutdown. + await StopAsync(); + + _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId); } public async Task StopAsync() diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs index abf4bddf7d..58180c0f69 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.FeatureCollection.cs @@ -253,7 +253,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http void IHttpRequestLifetimeFeature.Abort() { - Abort(); + Abort(error: null); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs index 9dc9106e48..775fed6858 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs @@ -451,7 +451,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http /// /// Immediate kill the connection and poison the request and response streams. /// - public void Abort(Exception error = null) + public void Abort(Exception error) { if (Interlocked.Exchange(ref _requestAborted, 1) == 0) { diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index 261db5c1d7..1ced4a5b25 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -31,7 +31,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // this is temporary until it does private TaskCompletionSource _flushTcs; private readonly object _flushLock = new object(); - private readonly Action _onFlushCallback; public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log) { @@ -39,7 +38,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _frame = frame; _connectionId = connectionId; _log = log; - _onFlushCallback = OnFlush; } public Task WriteAsync( @@ -87,6 +85,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http var awaitable = writableBuffer.FlushAsync(); if (awaitable.IsCompleted) { + AbortIfNeeded(awaitable); + // The flush task can't fail today return TaskCache.CompletedTask; } @@ -105,16 +105,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { _flushTcs = new TaskCompletionSource(); - awaitable.OnCompleted(_onFlushCallback); + awaitable.OnCompleted(() => + { + AbortIfNeeded(awaitable); + _flushTcs.TrySetResult(null); + }); } } return _flushTcs.Task; } - private void OnFlush() + private void AbortIfNeeded(WritableBufferAwaitable awaitable) { - _flushTcs.TrySetResult(null); + try + { + awaitable.GetResult(); + } + catch (Exception ex) + { + _frame.Abort(ex); + } } void ISocketOutput.Write(ArraySegment buffer, bool chunk) @@ -126,7 +137,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { if (cancellationToken.IsCancellationRequested) { - _frame.Abort(); + _frame.Abort(error: null); _cancelled = true; return Task.FromCanceled(cancellationToken); } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 483a4fd050..a6d7b6ac82 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -56,7 +56,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal private IConnectionHandler ConnectionHandler => ListenerContext.TransportContext.ConnectionHandler; private LibuvThread Thread => ListenerContext.Thread; - public void Start() + public async void Start() { try { @@ -64,45 +64,54 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal ConnectionId = _connectionContext.ConnectionId; Input = _connectionContext.Input; - Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, this, ConnectionId, Log); + Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, ConnectionId, Log); // Start socket prior to applying the ConnectionAdapter _socket.ReadStart(_allocCallback, _readCallback, this); - // This *must* happen after socket.ReadStart - // The socket output consumer is the only thing that can close the connection. If the - // output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards. - var ignore = Output.StartWrites(); + try + { + // This *must* happen after socket.ReadStart + // The socket output consumer is the only thing that can close the connection. If the + // output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards. + await Output.WriteOutputAsync(); + _connectionContext.Output.Complete(); + } + catch (UvException ex) + { + _connectionContext.Output.Complete(ex); + } + finally + { + // Ensure the socket is disposed prior to completing in the input writer. + _socket.Dispose(); + Input.Complete(new TaskCanceledException("The request was aborted")); + _socketClosedTcs.TrySetResult(null); + } } catch (Exception e) { - Log.LogError(0, e, "Connection.StartFrame"); - throw; + Log.LogCritical(0, e, $"{nameof(LibuvConnection)}.{nameof(Start)}() {ConnectionId}"); + } + finally + { + _connectionContext.OnConnectionClosed(); } } - public Task StopAsync() + public async Task StopAsync() { - return Task.WhenAll(_connectionContext.StopAsync(), _socketClosedTcs.Task); + await _connectionContext.StopAsync(); + await _socketClosedTcs.Task; } - public virtual Task AbortAsync(Exception error = null) + public Task AbortAsync(Exception error) { _connectionContext.Abort(error); - return _socketClosedTcs.Task; + return StopAsync(); } // Called on Libuv thread - public virtual void Close() - { - _socket.Dispose(); - - _connectionContext.OnConnectionClosed(); - - Input.Complete(new TaskCanceledException("The request was aborted")); - _socketClosedTcs.TrySetResult(null); - } - private static LibuvFunctions.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state) { return ((LibuvConnection)state).OnAlloc(handle, suggestedSize); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs index 9cc99b919a..d52baf4765 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionManager.cs @@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal private void WalkConnectionsAndAbortCore(TaskCompletionSource tcs) { - WalkConnectionsCore(connection => connection.AbortAsync(), tcs); + WalkConnectionsCore(connection => connection.AbortAsync(error: null), tcs); } private void WalkConnectionsCore(Func action, TaskCompletionSource tcs) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs index c81cf3f9e9..4d12a2174f 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs @@ -12,7 +12,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal { private readonly LibuvThread _thread; private readonly UvStreamHandle _socket; - private readonly LibuvConnection _connection; private readonly string _connectionId; private readonly ILibuvTrace _log; @@ -23,7 +22,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal IPipeReader pipe, LibuvThread thread, UvStreamHandle socket, - LibuvConnection connection, string connectionId, ILibuvTrace log) { @@ -32,13 +30,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal // get's scheduled _thread = thread; _socket = socket; - _connection = connection; _connectionId = connectionId; _log = log; _writeReqPool = thread.WriteReqPool; } - public async Task StartWrites() + public async Task WriteOutputAsync() { while (true) { @@ -53,7 +50,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal var writeResult = await writeReq.WriteAsync(_socket, buffer); _writeReqPool.Return(writeReq); - OnWriteCompleted(writeResult.Status, writeResult.Error); + LogWriteInfo(writeResult.Status, writeResult.Error); + + if (writeResult.Error != null) + { + throw writeResult.Error; + } } if (result.IsCancelled) @@ -74,27 +76,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal _pipe.Advance(result.Buffer.End); } } - - // We're done reading - _pipe.Complete(); - - // Close the connection - _connection.Close(); } - private void OnWriteCompleted(int writeStatus, Exception writeError) + private void LogWriteInfo(int status, Exception error) { - // Called inside _contextLock - var status = writeStatus; - var error = writeError; - - if (error != null) - { - // Abort the connection for any failed write - // Queued on threadpool so get it in as first op. - _connection.AbortAsync(); - } - if (error == null) { _log.ConnectionWriteCallback(_connectionId, status); diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index 12bcbdfd95..8bfaa8ba48 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -7,8 +7,10 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers; using Microsoft.AspNetCore.Testing; using Xunit; @@ -316,66 +318,72 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests return 0; }; - using (var mockConnection = new MockConnection()) + var abortedSource = new CancellationTokenSource(); + + var pipeOptions = new PipeOptions { - var abortedSource = mockConnection.RequestAbortedSource; + ReaderScheduler = _libuvThread, + MaximumSizeHigh = maxResponseBufferSize, + MaximumSizeLow = maxResponseBufferSize, + }; - var pipeOptions = new PipeOptions + using (var socketOutput = CreateOutputProducer(pipeOptions, abortedSource)) + { + var bufferSize = maxResponseBufferSize - 1; + + var data = new byte[bufferSize]; + var fullBuffer = new ArraySegment(data, 0, bufferSize); + + // Act + var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + // task1 should complete successfully as < _maxBytesPreCompleted + + // First task is completed and successful + Assert.True(task1Success.IsCompleted); + Assert.False(task1Success.IsCanceled); + Assert.False(task1Success.IsFaulted); + + // following tasks should wait. + var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + + // Give time for tasks to percolate + await _mockLibuv.OnPostTask; + + // Second task is not completed + Assert.False(task2Success.IsCompleted); + Assert.False(task2Success.IsCanceled); + Assert.False(task2Success.IsFaulted); + + // Third task is not completed + Assert.False(task3Canceled.IsCompleted); + Assert.False(task3Canceled.IsCanceled); + Assert.False(task3Canceled.IsFaulted); + + // Cause all writes to fail + while (completeQueue.TryDequeue(out var triggerNextCompleted)) { - ReaderScheduler = _libuvThread, - MaximumSizeHigh = maxResponseBufferSize, - MaximumSizeLow = maxResponseBufferSize, - }; - - using (var socketOutput = CreateOutputProducer(pipeOptions, mockConnection)) - { - var bufferSize = maxResponseBufferSize - 1; - - var data = new byte[bufferSize]; - var fullBuffer = new ArraySegment(data, 0, bufferSize); - - // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); - // task1 should complete successfully as < _maxBytesPreCompleted - - // First task is completed and successful - Assert.True(task1Success.IsCompleted); - Assert.False(task1Success.IsCanceled); - Assert.False(task1Success.IsFaulted); - - // following tasks should wait. - var task2Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken)); - var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); - - // Give time for tasks to percolate - await _mockLibuv.OnPostTask; - - // Second task is not completed - Assert.False(task2Success.IsCompleted); - Assert.False(task2Success.IsCanceled); - Assert.False(task2Success.IsFaulted); - - // Third task is not completed - Assert.False(task3Canceled.IsCompleted); - Assert.False(task3Canceled.IsCanceled); - Assert.False(task3Canceled.IsFaulted); - - // Cause all writes to fail - while (completeQueue.TryDequeue(out var triggerNextCompleted)) - { - await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted); - } - - // Second task is now completed - await task2Success.TimeoutAfter(TimeSpan.FromSeconds(5)); - - // Third task is now canceled - // TODO: Cancellation isn't supported right now - // await Assert.ThrowsAsync(() => task3Canceled); - // Assert.True(task3Canceled.IsCanceled); - - Assert.True(abortedSource.IsCancellationRequested); + await _libuvThread.PostAsync(cb => cb(-1), triggerNextCompleted); } + + // Second task is now completed + Assert.True(task2Success.IsCompleted); + Assert.False(task2Success.IsCanceled); + Assert.False(task2Success.IsFaulted); + + // A final write guarantees that the error is observed by OutputProducer, + // but doesn't return a canceled/faulted task. + var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + Assert.True(task4Success.IsCompleted); + Assert.False(task4Success.IsCanceled); + Assert.False(task4Success.IsFaulted); + + // Third task is now canceled + // TODO: Cancellation isn't supported right now + // await Assert.ThrowsAsync(() => task3Canceled); + // Assert.True(task3Canceled.IsCanceled); + + Assert.True(abortedSource.IsCancellationRequested); } } @@ -488,22 +496,50 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests - private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, MockConnection connection = null) + private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, CancellationTokenSource cts = null) { var pipe = _pipeFactory.Create(pipeOptions); var logger = new TestApplicationErrorLogger(); - var serviceContext = new TestServiceContext() { Log = new TestKestrelTrace(logger) }; - var transportContext = new TestLibuvTransportContext() { Log = new LibuvTrace(logger) }; + var serviceContext = new TestServiceContext + { + Log = new TestKestrelTrace(logger), + ThreadPool = new InlineLoggingThreadPool(new TestKestrelTrace(logger)) + }; + var transportContext = new TestLibuvTransportContext { Log = new LibuvTrace(logger) }; var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); var socket = new MockSocket(_mockLibuv, _libuvThread.Loop.ThreadId, transportContext.Log); var socketOutput = new OutputProducer(pipe.Writer, frame, "0", serviceContext.Log); - var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, connection ?? new MockConnection(), "0", transportContext.Log); - var ignore = consumer.StartWrites(); + var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, "0", transportContext.Log); + + frame.LifetimeControl = new ConnectionLifetimeControl("0", pipe.Reader, socketOutput, serviceContext.Log); + + if (cts != null) + { + frame.RequestAborted.Register(cts.Cancel); + } + + var ignore = WriteOutputAsync(consumer, pipe.Reader); return socketOutput; } + + private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader) + { + // This WriteOutputAsync() calling code is equivalent to that in LibuvConnection. + try + { + // Ensure that outputReader.Complete() runs on the LibuvThread. + // Without ConfigureAwait(false), xunit will dispatch. + await consumer.WriteOutputAsync().ConfigureAwait(false); + outputReader.Complete(); + } + catch (UvException ex) + { + outputReader.Complete(ex); + } + } } } \ No newline at end of file diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs deleted file mode 100644 index 5e2802f976..0000000000 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnection.cs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; -using Microsoft.Extensions.Internal; - -namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers -{ - public class MockConnection : LibuvConnection, IDisposable - { - private readonly TaskCompletionSource _socketClosedTcs = new TaskCompletionSource(); - - public MockConnection() - { - RequestAbortedSource = new CancellationTokenSource(); - ListenerContext = new ListenerContext(new LibuvTransportContext()); - } - - public override Task AbortAsync(Exception error = null) - { - RequestAbortedSource?.Cancel(); - return TaskCache.CompletedTask; - } - - public override void Close() - { - _socketClosedTcs.SetResult(null); - } - - public CancellationTokenSource RequestAbortedSource { get; } - - public Task SocketClosed => _socketClosedTcs.Task; - - public void Dispose() - { - RequestAbortedSource.Dispose(); - } - } -}