diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs index 99bf9346d9..25e44c1df4 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs @@ -87,8 +87,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } } - public async void OnConnectionClosed() + public async void OnConnectionClosed(Exception ex) { + // Abort the connection (if it isn't already aborted) + _frame.Abort(ex); + Log.ConnectionStop(ConnectionId); KestrelEventSource.Log.ConnectionStop(this); _socketClosedTcs.SetResult(null); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index 5e94d3e47f..c5fe3dee51 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -31,6 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // this is temporary until it does private TaskCompletionSource _flushTcs; private readonly object _flushLock = new object(); + private Action _flushCompleted; public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log) { @@ -38,6 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _frame = frame; _connectionId = connectionId; _log = log; + _flushCompleted = OnFlushCompleted; } public Task WriteAsync( @@ -83,8 +85,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http var awaitable = writableBuffer.FlushAsync(cancellationToken); if (awaitable.IsCompleted) { - AbortIfNeeded(awaitable); - // The flush task can't fail today return TaskCache.CompletedTask; } @@ -103,27 +103,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { _flushTcs = new TaskCompletionSource(); - awaitable.OnCompleted(() => - { - AbortIfNeeded(awaitable); - _flushTcs.TrySetResult(null); - }); + awaitable.OnCompleted(_flushCompleted); } } await _flushTcs.Task; + + if (cancellationToken.IsCancellationRequested) + { + _frame.Abort(error: null); + } + cancellationToken.ThrowIfCancellationRequested(); } - private void AbortIfNeeded(WritableBufferAwaitable awaitable) + private void OnFlushCompleted() { - try - { - awaitable.GetResult(); - } - catch (Exception ex) - { - _frame.Abort(ex); - } + _flushTcs.TrySetResult(null); } void ISocketOutput.Write(ArraySegment buffer, bool chunk) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs index 84e491fed1..d05110b930 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions/IConnectionContext.cs @@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions IPipeWriter Input { get; } IPipeReader Output { get; } - // TODO: Remove these (Use Pipes Tasks instead?) - void OnConnectionClosed(); + // TODO: Remove these (https://github.com/aspnet/KestrelHttpServer/issues/1772) + void OnConnectionClosed(Exception ex); void Abort(Exception ex); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 0bfc0446c8..d9dac68dd3 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -66,17 +66,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal // Start socket prior to applying the ConnectionAdapter StartReading(); + Exception error = null; + try { // This *must* happen after socket.ReadStart // The socket output consumer is the only thing that can close the connection. If the // output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards. await Output.WriteOutputAsync(); - _connectionContext.Output.Complete(); } catch (UvException ex) { - _connectionContext.Output.Complete(ex); + error = new IOException(ex.Message, ex); } finally { @@ -91,7 +92,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal _socket.Dispose(); // Tell the kestrel we're done with this connection - _connectionContext.OnConnectionClosed(); + _connectionContext.OnConnectionClosed(error); + _connectionContext.Output.Complete(error); } } catch (Exception e) @@ -221,7 +223,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal // ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected). // This should be treated the same as OnRead() seeing a "normalDone" condition. Log.ConnectionReadFin(ConnectionId); - Input.Complete(new IOException(ex.Message, ex)); + var error = new IOException(ex.Message, ex); + + _connectionContext.Abort(error); + Input.Complete(error); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs index 6c9d0cfcf5..0cec24835e 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnectionContext.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using System.Net; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs index 7a1dab14c5..025c4188f8 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs @@ -7,7 +7,6 @@ using System.Diagnostics; using System.IO; using System.Net; using System.Net.Sockets; -using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.Buffers; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; @@ -72,15 +71,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { // TODO: Log } - finally - { - // Mark the connection as closed after disposal - _connectionContext.OnConnectionClosed(); - } } private async Task DoReceive() { + Exception error = null; + try { while (true) @@ -112,40 +108,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets break; } } - - _connectionContext.Abort(ex: null); - _input.Complete(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) + { + error = new ConnectionResetException(ex.Message, ex); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) + { + error = new TaskCanceledException("The request was aborted"); + } + catch (ObjectDisposedException) + { + error = new TaskCanceledException("The request was aborted"); + } + catch (IOException ex) + { + error = ex; } catch (Exception ex) { - Exception error = null; - - if (ex is SocketException se) - { - if (se.SocketErrorCode == SocketError.ConnectionReset) - { - // Connection reset - error = new ConnectionResetException(ex.Message, ex); - } - else if (se.SocketErrorCode == SocketError.OperationAborted) - { - error = new TaskCanceledException("The request was aborted"); - } - } - - if (ex is ObjectDisposedException) - { - error = new TaskCanceledException("The request was aborted"); - } - else if (ex is IOException ioe) - { - error = ioe; - } - else if (error == null) - { - error = new IOException(ex.Message, ex); - } - + error = new IOException(ex.Message, ex); + } + finally + { _connectionContext.Abort(error); _input.Complete(error); } @@ -172,6 +157,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets private async Task DoSend() { + Exception error = null; + try { while (true) @@ -220,13 +207,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets _output.Advance(buffer.End); } } - - // We're done reading - _output.Complete(); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted) + { + error = null; + } + catch (ObjectDisposedException) + { + error = null; + } + catch (IOException ex) + { + error = ex; } catch (Exception ex) { - _output.Complete(ex); + error = new IOException(ex.Message, ex); + } + finally + { + _connectionContext.OnConnectionClosed(error); + _output.Complete(error); } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs index 6928a6f70b..2cec8398a0 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Mocks/MockConnectionInformation.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; using System.Net; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index 3d8fa5620c..bbaaea6054 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -9,10 +9,12 @@ using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers; using Microsoft.AspNetCore.Testing; +using Moq; using Xunit; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests @@ -282,11 +284,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.NotEmpty(completeQueue); // Add more bytes to the write-behind buffer to prevent the next write from - ((ISocketOutput) socketOutput).Write((writableBuffer, state) => - { - writableBuffer.Write(state); - }, - halfWriteBehindBuffer); + ((ISocketOutput)socketOutput).Write((writableBuffer, state) => + { + writableBuffer.Write(state); + }, + halfWriteBehindBuffer); // Act var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); @@ -679,12 +681,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests frame.RequestAborted.Register(cts.Cancel); } - var ignore = WriteOutputAsync(consumer, pipe.Reader); + var ignore = WriteOutputAsync(consumer, pipe.Reader, frame); return socketOutput; } - private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader) + private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader, Frame frame) { // This WriteOutputAsync() calling code is equivalent to that in LibuvConnection. try @@ -692,10 +694,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // Ensure that outputReader.Complete() runs on the LibuvThread. // Without ConfigureAwait(false), xunit will dispatch. await consumer.WriteOutputAsync().ConfigureAwait(false); + + frame.Abort(error: null); outputReader.Complete(); } catch (UvException ex) { + frame.Abort(ex); outputReader.Complete(ex); } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs index 0ec0a04f12..6e8c8b312e 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs @@ -35,12 +35,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers public IPipeWriter Input { get; set; } public IPipeReader Output { get; set; } - public void OnConnectionClosed() + public void Abort(Exception ex) { - } - public void Abort(Exception ex) + public void OnConnectionClosed(Exception ex) { } }