From 7854c0604a3e6e50a0bea84c4fa9d9e6ca5d7057 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 26 Aug 2017 20:19:55 -0700 Subject: [PATCH] Remove the events on ConnectionContext (#2023) - Use the pipe events and removed the Tasks from ConnectionContext - Remove OnConnectionClosed from FrameConnection. Since the `FrameConnetion` is a single middleware, not the entire pipeline, we shouldn't need to wait on the connection close there. - It seems like the callbacks are rooted on the pipe even after they fire. This needs to be investigated in pipelines. --- src/Kestrel.Core/Internal/FrameConnection.cs | 9 ----- .../Internal/HttpConnectionMiddleware.cs | 37 +++++++++++-------- .../Internal/IConnectionHandler.cs | 1 + .../Internal/TransportConnection.Features.cs | 10 ----- .../Internal/TransportConnection.cs | 29 +-------------- .../Internal/LibuvConnection.cs | 6 --- .../SocketConnection.cs | 17 +++------ .../ConnectionContext.cs | 4 -- .../DefaultConnectionContext.cs | 10 +---- .../Features/IConnectionTransportFeature.cs | 10 +---- .../ConnectionHandlerTests.cs | 4 -- test/Kestrel.FunctionalTests/RequestTests.cs | 2 + .../TestHelpers/MockConnectionHandler.cs | 1 + 13 files changed, 34 insertions(+), 106 deletions(-) diff --git a/src/Kestrel.Core/Internal/FrameConnection.cs b/src/Kestrel.Core/Internal/FrameConnection.cs index f2e3a3e172..8171817b7a 100644 --- a/src/Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Kestrel.Core/Internal/FrameConnection.cs @@ -28,7 +28,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal private readonly FrameConnectionContext _context; private IList _adaptedConnections; - private readonly TaskCompletionSource _socketClosedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); private Frame _frame; private Http2Connection _http2Connection; private volatile int _http2ConnectionState; @@ -152,7 +151,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal } await adaptedPipelineTask; - await _socketClosedTcs.Task; } catch (Exception ex) { @@ -191,13 +189,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal }); } - public void OnConnectionClosed(Exception ex) - { - Abort(ex); - - _socketClosedTcs.TrySetResult(null); - } - public Task StopAsync() { Debug.Assert(_frame != null, $"{nameof(_frame)} is null"); diff --git a/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs b/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs index e79658de2c..221371535b 100644 --- a/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs +++ b/src/Kestrel.Core/Internal/HttpConnectionMiddleware.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -63,25 +64,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal var connection = new FrameConnection(frameConnectionContext); - // The order here is important, start request processing so that - // the frame is created before this yields. Events need to be wired up - // afterwards var processingTask = connection.StartRequestProcessing(_application); - // Wire up the events an forward calls to the frame connection - // It's important that these execute synchronously because graceful - // connection close is order sensative (for now) - connectionContext.ConnectionAborted.ContinueWith((task, state) => - { - // Unwrap the aggregate exception - ((FrameConnection)state).Abort(task.Exception?.InnerException); - }, - connection, TaskContinuationOptions.ExecuteSynchronously); + var inputTcs = new TaskCompletionSource(); - connectionContext.ConnectionClosed.ContinueWith((task, state) => + // Abort the frame when the transport writer completes + connectionContext.Transport.Input.OnWriterCompleted((error, state) => { - // Unwrap the aggregate exception - ((FrameConnection)state).OnConnectionClosed(task.Exception?.InnerException); + var tcs = (TaskCompletionSource)state; + + if (error != null) + { + tcs.TrySetException(error); + } + else + { + tcs.TrySetResult(null); + } + }, + inputTcs); + + inputTcs.Task.ContinueWith((task, state) => + { + ((FrameConnection)state).Abort(task.Exception?.InnerException); }, connection, TaskContinuationOptions.ExecuteSynchronously); diff --git a/src/Kestrel.Transport.Abstractions/Internal/IConnectionHandler.cs b/src/Kestrel.Transport.Abstractions/Internal/IConnectionHandler.cs index 9ae3f89c3f..41b5c57839 100644 --- a/src/Kestrel.Transport.Abstractions/Internal/IConnectionHandler.cs +++ b/src/Kestrel.Transport.Abstractions/Internal/IConnectionHandler.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System.Threading.Tasks; using Microsoft.AspNetCore.Http.Features; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal diff --git a/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.Features.cs b/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.Features.cs index 7a8cd3ce29..2ac096eb24 100644 --- a/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.Features.cs +++ b/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.Features.cs @@ -109,16 +109,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal set => Application = value; } - Task IConnectionTransportFeature.ConnectionAborted - { - get => _abortTcs.Task; - } - - Task IConnectionTransportFeature.ConnectionClosed - { - get => _closedTcs.Task; - } - object IFeatureCollection.this[Type key] { get => FastFeatureGet(key); diff --git a/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.cs b/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.cs index a447fae39c..c269a8f928 100644 --- a/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.cs +++ b/src/Kestrel.Transport.Abstractions/Internal/TransportConnection.cs @@ -1,15 +1,11 @@ using System; using System.IO.Pipelines; using System.Net; -using System.Threading.Tasks; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal { public abstract partial class TransportConnection { - private readonly TaskCompletionSource _abortTcs = new TaskCompletionSource(); - private readonly TaskCompletionSource _closedTcs = new TaskCompletionSource(); - public TransportConnection() { _currentIConnectionIdFeature = this; @@ -31,28 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal public IPipeConnection Transport { get; set; } public IPipeConnection Application { get; set; } - protected void Abort(Exception exception) - { - if (exception == null) - { - _abortTcs.TrySetResult(null); - } - else - { - _abortTcs.TrySetException(exception); - } - } - - protected void Close(Exception exception) - { - if (exception == null) - { - _closedTcs.TrySetResult(null); - } - else - { - _closedTcs.TrySetException(exception); - } - } + public IPipeWriter Input => Application.Output; + public IPipeReader Output => Application.Input; } } diff --git a/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 1ed08a309e..26f3ed4ac3 100644 --- a/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -46,9 +46,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } - public IPipeWriter Input => Application.Output; - public IPipeReader Output => Application.Input; - public LibuvOutputConsumer OutputConsumer { get; set; } private ILibuvTrace Log => ListenerContext.TransportContext.Log; @@ -83,7 +80,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal // Now, complete the input so that no more reads can happen Input.Complete(error ?? new ConnectionAbortedException()); Output.Complete(error); - Close(error); // Make sure it isn't possible for a paused read to resume reading after calling uv_close // on the stream handle @@ -178,7 +174,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } - Abort(error); // Complete after aborting the connection Input.Complete(error); } @@ -216,7 +211,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal Log.ConnectionReadFin(ConnectionId); var error = new IOException(ex.Message, ex); - Abort(error); Input.Complete(error); } } diff --git a/src/Kestrel.Transport.Sockets/SocketConnection.cs b/src/Kestrel.Transport.Sockets/SocketConnection.cs index e39e605d6a..4f11d7cd1f 100644 --- a/src/Kestrel.Transport.Sockets/SocketConnection.cs +++ b/src/Kestrel.Transport.Sockets/SocketConnection.cs @@ -19,8 +19,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets private readonly Socket _socket; private readonly SocketTransport _transport; - private IPipeWriter _input; - private IPipeReader _output; private IList> _sendBufferList; private const int MinAllocBufferSize = 2048; @@ -48,9 +46,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { connectionHandler.OnConnection(this); - _input = Application.Output; - _output = Application.Input; - // Spawn send and receive logic Task receiveTask = DoReceive(); Task sendTask = DoSend(); @@ -86,7 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets while (true) { // Ensure we have some reasonable amount of buffer space - var buffer = _input.Alloc(MinAllocBufferSize); + var buffer = Input.Alloc(MinAllocBufferSize); try { @@ -135,8 +130,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } finally { - Abort(error); - _input.Complete(error); + Input.Complete(error); } } @@ -168,7 +162,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets while (true) { // Wait for data to write from the pipe producer - var result = await _output.ReadAsync(); + var result = await Output.ReadAsync(); var buffer = result.Buffer; if (result.IsCancelled) @@ -205,7 +199,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } finally { - _output.Advance(buffer.End); + Output.Advance(buffer.End); } } @@ -229,8 +223,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } finally { - Close(error); - _output.Complete(error); + Output.Complete(error); } } diff --git a/src/Protocols.Abstractions/ConnectionContext.cs b/src/Protocols.Abstractions/ConnectionContext.cs index c10a884f48..f2ddc754ae 100644 --- a/src/Protocols.Abstractions/ConnectionContext.cs +++ b/src/Protocols.Abstractions/ConnectionContext.cs @@ -15,9 +15,5 @@ namespace Microsoft.AspNetCore.Protocols public abstract IPipeConnection Transport { get; set; } public abstract PipeFactory PipeFactory { get; } - - public abstract Task ConnectionAborted { get; } - - public abstract Task ConnectionClosed { get; } } } diff --git a/src/Protocols.Abstractions/DefaultConnectionContext.cs b/src/Protocols.Abstractions/DefaultConnectionContext.cs index 814d4470f7..b81fc16f06 100644 --- a/src/Protocols.Abstractions/DefaultConnectionContext.cs +++ b/src/Protocols.Abstractions/DefaultConnectionContext.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Text; -using System.Threading.Tasks; +using System.IO.Pipelines; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Protocols.Features; @@ -39,10 +35,6 @@ namespace Microsoft.AspNetCore.Protocols set => ConnectionTransportFeature.Transport = value; } - public override Task ConnectionAborted => ConnectionTransportFeature.ConnectionAborted; - - public override Task ConnectionClosed => ConnectionTransportFeature.ConnectionClosed; - struct FeatureInterfaces { public IConnectionIdFeature ConnectionId; diff --git a/src/Protocols.Abstractions/Features/IConnectionTransportFeature.cs b/src/Protocols.Abstractions/Features/IConnectionTransportFeature.cs index e9cccb4a3e..8f27924126 100644 --- a/src/Protocols.Abstractions/Features/IConnectionTransportFeature.cs +++ b/src/Protocols.Abstractions/Features/IConnectionTransportFeature.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Text; -using System.Threading.Tasks; +using System.IO.Pipelines; namespace Microsoft.AspNetCore.Protocols.Features { @@ -17,9 +13,5 @@ namespace Microsoft.AspNetCore.Protocols.Features IScheduler InputWriterScheduler { get; } IScheduler OutputReaderScheduler { get; } - - Task ConnectionAborted { get; } - - Task ConnectionClosed { get; } } } diff --git a/test/Kestrel.Core.Tests/ConnectionHandlerTests.cs b/test/Kestrel.Core.Tests/ConnectionHandlerTests.cs index b21387675e..45450bf9bd 100644 --- a/test/Kestrel.Core.Tests/ConnectionHandlerTests.cs +++ b/test/Kestrel.Core.Tests/ConnectionHandlerTests.cs @@ -60,10 +60,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests public IScheduler OutputReaderScheduler => TaskRunScheduler.Default; - public Task ConnectionAborted => Task.CompletedTask; - - public Task ConnectionClosed => Task.CompletedTask; - public string ConnectionId { get; set; } } } diff --git a/test/Kestrel.FunctionalTests/RequestTests.cs b/test/Kestrel.FunctionalTests/RequestTests.cs index 3a55e9a4cf..281e5e057f 100644 --- a/test/Kestrel.FunctionalTests/RequestTests.cs +++ b/test/Kestrel.FunctionalTests/RequestTests.cs @@ -990,6 +990,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests public async Task ConnectionClosesWhenFinReceivedBeforeRequestCompletes(ListenOptions listenOptions) { var testContext = new TestServiceContext(); + // FIN callbacks are scheduled so run inline to make this test more reliable + testContext.ThreadPool = new InlineLoggingThreadPool(testContext.Log); using (var server = new TestServer(TestApp.EchoAppChunked, testContext, listenOptions)) { diff --git a/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs b/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs index ef856124fe..f1615378ee 100644 --- a/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs +++ b/test/Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs @@ -3,6 +3,7 @@ using System; using System.IO.Pipelines; +using System.Threading.Tasks; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Protocols; using Microsoft.AspNetCore.Protocols.Features;