From e6a88c1b9c44a62f7d96437527f55e88aacdce26 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Thu, 10 May 2018 17:13:57 -0700 Subject: [PATCH] Relieve response backpressure immediately when closing socket (#2557) * Relieve response backpressure immediately when closing socket --- .../Internal/SocketConnection.cs | 44 ++---- test/Kestrel.FunctionalTests/ResponseTests.cs | 131 ++++++++++++++++-- test/shared/TestConnection.cs | 2 + 3 files changed, 135 insertions(+), 42 deletions(-) diff --git a/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs b/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs index 8f39a08540..d410599663 100644 --- a/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs +++ b/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs @@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal internal sealed class SocketConnection : TransportConnection { private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2; - private static readonly bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows); private readonly Socket _socket; private readonly PipeScheduler _scheduler; @@ -55,11 +54,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal ConnectionClosed = _connectionClosedTokenSource.Token; - // On *nix platforms, Sockets already dispatches to the ThreadPool. - var awaiterScheduler = IsWindows ? _scheduler : PipeScheduler.Inline; - - _receiver = new SocketReceiver(_socket, awaiterScheduler); - _sender = new SocketSender(_socket, awaiterScheduler); + _receiver = new SocketReceiver(_socket, _scheduler); + _sender = new SocketSender(_socket, _scheduler); } public override MemoryPool MemoryPool { get; } @@ -69,30 +65,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal public async Task StartAsync(IConnectionDispatcher connectionDispatcher) { - Exception sendError = null; try { connectionDispatcher.OnConnection(this); // Spawn send and receive logic - Task receiveTask = DoReceive(); - Task sendTask = DoSend(); - - // If the sending task completes then close the receive - // We don't need to do this in the other direction because the kestrel - // will trigger the output closing once the input is complete. - if (await Task.WhenAny(receiveTask, sendTask) == sendTask) - { - // Tell the reader it's being aborted - _socket.Dispose(); - } + var receiveTask = DoReceive(); + var sendTask = DoSend(); // Now wait for both to complete await receiveTask; - sendError = await sendTask; + await sendTask; - // Dispose the socket(should noop if already called) - _socket.Dispose(); _receiver.Dispose(); _sender.Dispose(); ThreadPool.QueueUserWorkItem(state => ((SocketConnection)state).CancelConnectionClosedToken(), this); @@ -101,18 +85,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}."); } - finally - { - // Complete the output after disposing the socket - Output.Complete(sendError); - } } public override void Abort() { // Try to gracefully close the socket to match libuv behavior. Shutdown(); - _socket.Dispose(); } private async Task DoReceive() @@ -207,7 +185,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal } } - private async Task DoSend() + private async Task DoSend() { Exception error = null; @@ -231,10 +209,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { error = new IOException(ex.Message, ex); } + finally + { + Shutdown(); - Shutdown(); - - return error; + // Complete the output after disposing the socket + Output.Complete(error); + } } private async Task ProcessSends() @@ -284,6 +265,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal // Try to gracefully close the socket even for aborts to match libuv behavior. _socket.Shutdown(SocketShutdown.Both); + _socket.Dispose(); } } } diff --git a/test/Kestrel.FunctionalTests/ResponseTests.cs b/test/Kestrel.FunctionalTests/ResponseTests.cs index bc940be76b..72fe4d1ba2 100644 --- a/test/Kestrel.FunctionalTests/ResponseTests.cs +++ b/test/Kestrel.FunctionalTests/ResponseTests.cs @@ -2642,6 +2642,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests var responseRateTimeoutMessageLogged = new TaskCompletionSource(); var connectionStopMessageLogged = new TaskCompletionSource(); var requestAborted = new TaskCompletionSource(); + var appFuncCompleted = new TaskCompletionSource(); var mockKestrelTrace = new Mock(); mockKestrelTrace @@ -2649,7 +2650,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests .Callback(() => responseRateTimeoutMessageLogged.SetResult(null)); mockKestrelTrace .Setup(trace => trace.ConnectionStop(It.IsAny())) - .Callback(() => connectionStopMessageLogged.SetResult(null));; + .Callback(() => connectionStopMessageLogged.SetResult(null)); var testContext = new TestServiceContext { @@ -2660,7 +2661,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests { Limits = { - MinResponseDataRate = new MinDataRate(bytesPerSecond: double.MaxValue, gracePeriod: TimeSpan.FromSeconds(2)) + MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2)) } } }; @@ -2676,10 +2677,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests context.Response.ContentLength = responseSize; - for (var i = 0; i < chunks; i++) + try { - await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted); - appLogger.LogInformation("Wrote chunk of {chunkSize} bytes", chunkSize); + for (var i = 0; i < chunks; i++) + { + await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted); + appLogger.LogInformation("Wrote chunk of {chunkSize} bytes", chunkSize); + } + } + catch (OperationCanceledException) + { + appFuncCompleted.SetResult(null); + throw; } } @@ -2702,7 +2711,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests await requestAborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60)); await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); - + await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); await AssertStreamAborted(connection.Reader.BaseStream, chunkSize * chunks); sw.Stop(); @@ -2724,6 +2733,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests var responseRateTimeoutMessageLogged = new TaskCompletionSource(); var connectionStopMessageLogged = new TaskCompletionSource(); var aborted = new TaskCompletionSource(); + var appFuncCompleted = new TaskCompletionSource(); var mockKestrelTrace = new Mock(); mockKestrelTrace @@ -2740,7 +2750,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests { Limits = { - MinResponseDataRate = new MinDataRate(bytesPerSecond: double.MaxValue, gracePeriod: TimeSpan.FromSeconds(2)) + MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2)) } } }; @@ -2762,9 +2772,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests context.Response.ContentLength = chunks * chunkSize; - for (var i = 0; i < chunks; i++) + try { - await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted); + for (var i = 0; i < chunks; i++) + { + await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted); + } + } + catch (OperationCanceledException) + { + appFuncCompleted.SetResult(null); + throw; } }, testContext, listenOptions)) { @@ -2780,6 +2798,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests await aborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60)); await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); + await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); // Temporary workaround for a deadlock when reading from an aborted client SslStream on Mac and Linux. if (TestPlatformHelper.IsWindows) @@ -2795,6 +2814,93 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests } } + [Fact] + public async Task ConnectionClosedWhenBothRequestAndResponseExperienceBackPressure() + { + const int bufferSize = 1024; + const int bufferCount = 256 * 1024; + var responseSize = bufferCount * bufferSize; + var buffer = new byte[bufferSize]; + + var responseRateTimeoutMessageLogged = new TaskCompletionSource(); + var connectionStopMessageLogged = new TaskCompletionSource(); + var requestAborted = new TaskCompletionSource(); + var appFuncCompleted = new TaskCompletionSource(); + + var mockKestrelTrace = new Mock(); + mockKestrelTrace + .Setup(trace => trace.ResponseMininumDataRateNotSatisfied(It.IsAny(), It.IsAny())) + .Callback(() => responseRateTimeoutMessageLogged.SetResult(null)); + mockKestrelTrace + .Setup(trace => trace.ConnectionStop(It.IsAny())) + .Callback(() => connectionStopMessageLogged.SetResult(null)); + + var testContext = new TestServiceContext + { + LoggerFactory = LoggerFactory, + Log = mockKestrelTrace.Object, + SystemClock = new SystemClock(), + ServerOptions = + { + Limits = + { + MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2)), + MaxRequestBodySize = responseSize + } + } + }; + + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)); + + async Task App(HttpContext context) + { + context.RequestAborted.Register(() => + { + requestAborted.SetResult(null); + }); + + try + { + await context.Request.Body.CopyToAsync(context.Response.Body); + } + catch + { + // This should always throw an OperationCanceledException. Unfortunately a ECONNRESET UvException sometimes gets thrown. + // This will be fixed by https://github.com/aspnet/KestrelHttpServer/pull/2547 + appFuncCompleted.SetResult(null); + throw; + } + } + + using (var server = new TestServer(App, testContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + // Close the connection with the last request so AssertStreamCompleted actually completes. + await connection.Send( + "POST / HTTP/1.1", + "Host:", + $"Content-Length: {responseSize}", + "", + ""); + + var sendTask = Task.Run(async () => + { + for (var i = 0; i < bufferCount; i++) + { + await connection.Stream.WriteAsync(buffer, 0, buffer.Length); + } + }); + + await requestAborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60)); + await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); + await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); + await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); + await AssertStreamAborted(connection.Stream, responseSize); + } + } + } + [Fact] public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseChunks() { @@ -2803,6 +2909,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests var chunkData = new byte[chunkSize]; var requestAborted = false; + var appFuncCompleted = new TaskCompletionSource(); var mockKestrelTrace = new Mock(); var testContext = new TestServiceContext @@ -2831,6 +2938,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests { await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted); } + + appFuncCompleted.SetResult(null); } using (var server = new TestServer(App, testContext, listenOptions)) @@ -2850,6 +2959,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests // Make sure consuming a single chunk exceeds the 2 second timeout. var targetBytesPerSecond = chunkSize / 4; await AssertStreamCompleted(connection.Reader.BaseStream, minTotalOutputSize, targetBytesPerSecond); + await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10)); mockKestrelTrace.Verify(t => t.ResponseMininumDataRateNotSatisfied(It.IsAny(), It.IsAny()), Times.Never()); mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny()), Times.Once()); @@ -2859,7 +2969,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests } [Fact] - public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseHeaders() + public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseHeaders() { var headerSize = 1024 * 1024; // 1 MB for each header value var headerCount = 64; // 64 MB of headers per response @@ -2933,7 +3043,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests } } - [Fact] public async Task NonZeroContentLengthFor304StatusCodeIsAllowed() { diff --git a/test/shared/TestConnection.cs b/test/shared/TestConnection.cs index fc6458c287..4f65da6bcf 100644 --- a/test/shared/TestConnection.cs +++ b/test/shared/TestConnection.cs @@ -49,6 +49,8 @@ namespace Microsoft.AspNetCore.Testing public Socket Socket => _socket; + public Stream Stream => _stream; + public StreamReader Reader => _reader; public void Dispose()