From 749e282102787fef75c5ed3a6830e59871c13d1f Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 29 Apr 2017 00:41:48 -0700 Subject: [PATCH] Prepare for OnReader/WriterCallbacks changes (#1791) - This change does a few things: 1. It adds the events we will replace with pipe events to IConnectionContext and IConnectionInformation to get out of band notifications about pipe completions. 2. It also implements those callbacks and exposing slight changes we'll need to make once we have them. The idea is that we can delete/replace these methods once we have the new pipe API and things will keep working. --- .../Internal/FrameConnection.cs | 5 +- .../Internal/Http/OutputProducer.cs | 27 +++--- .../IConnectionContext.cs | 4 +- .../Internal/LibuvConnection.cs | 13 ++- .../Internal/LibuvConnectionContext.cs | 1 + .../SocketConnection.cs | 83 ++++++++++--------- .../Mocks/MockConnectionInformation.cs | 1 + .../LibuvOutputConsumerTests.cs | 19 +++-- .../TestHelpers/MockConnectionHandler.cs | 5 +- 9 files changed, 84 insertions(+), 74 deletions(-) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs index 99bf9346d9..25e44c1df4 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs @@ -87,8 +87,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } } - public async void OnConnectionClosed() + public async void OnConnectionClosed(Exception ex) { + // Abort the connection (if it isn't already aborted) + _frame.Abort(ex); + Log.ConnectionStop(ConnectionId); KestrelEventSource.Log.ConnectionStop(this); _socketClosedTcs.SetResult(null); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index 5e94d3e47f..c5fe3dee51 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -31,6 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // this is temporary until it does private TaskCompletionSource _flushTcs; private readonly object _flushLock = new object(); + private Action _flushCompleted; public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log) { @@ -38,6 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _frame = frame; _connectionId = connectionId; _log = log; + _flushCompleted = OnFlushCompleted; } public Task WriteAsync( @@ -83,8 +85,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http var awaitable = writableBuffer.FlushAsync(cancellationToken); if (awaitable.IsCompleted) { - AbortIfNeeded(awaitable); - // The flush task can't fail today return TaskCache.CompletedTask; } @@ -103,27 +103,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { _flushTcs = new TaskCompletionSource(); - awaitable.OnCompleted(() => - { - AbortIfNeeded(awaitable); - _flushTcs.TrySetResult(null); - }); + awaitable.OnCompleted(_flushCompleted); } } await _flushTcs.Task; + + if (cancellationToken.IsCancellationRequested) + { + _frame.Abort(error: null); + } + cancellationToken.ThrowIfCancellationRequested(); } - private void AbortIfNeeded(WritableBufferAwaitable awaitable) + private void OnFlushCompleted() { - try - { - awaitable.GetResult(); - } - catch (Exception ex) - { - _frame.Abort(ex); - } + _flushTcs.TrySetResult(null); } void ISocketOutput.Write(ArraySegment buffer, bool chunk) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs index 84e491fed1..d05110b930 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs @@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions IPipeWriter Input { get; } IPipeReader Output { get; } - // TODO: Remove these (Use Pipes Tasks instead?) - void OnConnectionClosed(); + // TODO: Remove these (https://github.com/aspnet/KestrelHttpServer/issues/1772) + void OnConnectionClosed(Exception ex); void Abort(Exception ex); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 0bfc0446c8..d9dac68dd3 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -66,17 +66,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal // Start socket prior to applying the ConnectionAdapter StartReading(); + Exception error = null; + try { // This *must* happen after socket.ReadStart // The socket output consumer is the only thing that can close the connection. If the // output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards. await Output.WriteOutputAsync(); - _connectionContext.Output.Complete(); } catch (UvException ex) { - _connectionContext.Output.Complete(ex); + error = new IOException(ex.Message, ex); } finally { @@ -91,7 +92,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal _socket.Dispose(); // Tell the kestrel we're done with this connection - _connectionContext.OnConnectionClosed(); + _connectionContext.OnConnectionClosed(error); + _connectionContext.Output.Complete(error); } } catch (Exception e) @@ -221,7 +223,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal // ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected). // This should be treated the same as OnRead() seeing a "normalDone" condition. Log.ConnectionReadFin(ConnectionId); - Input.Complete(new IOException(ex.Message, ex)); + var error = new IOException(ex.Message, ex); + + _connectionContext.Abort(error); + Input.Complete(error); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs index 6c9d0cfcf5..0cec24835e 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using System.Net; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs index 7a1dab14c5..025c4188f8 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs @@ -7,7 +7,6 @@ using System.Diagnostics; using System.IO; using System.Net; using System.Net.Sockets; -using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.Buffers; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; @@ -72,15 +71,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { // TODO: Log } - finally - { - // Mark the connection as closed after disposal - _connectionContext.OnConnectionClosed(); - } } private async Task DoReceive() { + Exception error = null; + try { while (true) @@ -112,40 +108,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets break; } } - - _connectionContext.Abort(ex: null); - _input.Complete(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) + { + error = new ConnectionResetException(ex.Message, ex); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) + { + error = new TaskCanceledException("The request was aborted"); + } + catch (ObjectDisposedException) + { + error = new TaskCanceledException("The request was aborted"); + } + catch (IOException ex) + { + error = ex; } catch (Exception ex) { - Exception error = null; - - if (ex is SocketException se) - { - if (se.SocketErrorCode == SocketError.ConnectionReset) - { - // Connection reset - error = new ConnectionResetException(ex.Message, ex); - } - else if (se.SocketErrorCode == SocketError.OperationAborted) - { - error = new TaskCanceledException("The request was aborted"); - } - } - - if (ex is ObjectDisposedException) - { - error = new TaskCanceledException("The request was aborted"); - } - else if (ex is IOException ioe) - { - error = ioe; - } - else if (error == null) - { - error = new IOException(ex.Message, ex); - } - + error = new IOException(ex.Message, ex); + } + finally + { _connectionContext.Abort(error); _input.Complete(error); } @@ -172,6 +157,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets private async Task DoSend() { + Exception error = null; + try { while (true) @@ -220,13 +207,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets _output.Advance(buffer.End); } } - - // We're done reading - _output.Complete(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) + { + error = null; + } + catch (ObjectDisposedException) + { + error = null; + } + catch (IOException ex) + { + error = ex; } catch (Exception ex) { - _output.Complete(ex); + error = new IOException(ex.Message, ex); + } + finally + { + _connectionContext.OnConnectionClosed(error); + _output.Complete(error); } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs index 6928a6f70b..2cec8398a0 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using System.Net; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index 3d8fa5620c..bbaaea6054 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -9,10 +9,12 @@ using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers; using Microsoft.AspNetCore.Testing; +using Moq; using Xunit; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests @@ -282,11 +284,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.NotEmpty(completeQueue); // Add more bytes to the write-behind buffer to prevent the next write from - ((ISocketOutput) socketOutput).Write((writableBuffer, state) => - { - writableBuffer.Write(state); - }, - halfWriteBehindBuffer); + ((ISocketOutput)socketOutput).Write((writableBuffer, state) => + { + writableBuffer.Write(state); + }, + halfWriteBehindBuffer); // Act var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); @@ -679,12 +681,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests frame.RequestAborted.Register(cts.Cancel); } - var ignore = WriteOutputAsync(consumer, pipe.Reader); + var ignore = WriteOutputAsync(consumer, pipe.Reader, frame); return socketOutput; } - private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader) + private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader, Frame frame) { // This WriteOutputAsync() calling code is equivalent to that in LibuvConnection. try @@ -692,10 +694,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // Ensure that outputReader.Complete() runs on the LibuvThread. // Without ConfigureAwait(false), xunit will dispatch. await consumer.WriteOutputAsync().ConfigureAwait(false); + + frame.Abort(error: null); outputReader.Complete(); } catch (UvException ex) { + frame.Abort(ex); outputReader.Complete(ex); } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs index 0ec0a04f12..6e8c8b312e 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs @@ -35,12 +35,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers public IPipeWriter Input { get; set; } public IPipeReader Output { get; set; } - public void OnConnectionClosed() + public void Abort(Exception ex) { - } - public void Abort(Exception ex) + public void OnConnectionClosed(Exception ex) { } }