From 5e7f63b096e426d293008e7b1093c271f88962ed Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 7 Aug 2018 21:39:09 -0700 Subject: [PATCH] Make un-graceful shutdown a bit more graceful (#2748) * Make un-graceful shutdown a bit more graceful - Change how graceful shutdown is done to ensure there are no errors on shutdown. - Avoid closing pipes from under other components that own those pipes. - Close the application output so that the application's read loop ends gracefully. For websockets, we cancel the token, for other transports, we close the output directly. --- .../Internal/HttpConnectionContext.cs | 97 ++++++++----------- .../Internal/HttpConnectionDispatcher.cs | 3 + .../Transports/WebSocketsTransport.cs | 14 +-- .../HttpConnectionManagerTests.cs | 12 ++- 4 files changed, 64 insertions(+), 62 deletions(-) diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs index 045e821ee1..3b31756d86 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionContext.cs @@ -182,8 +182,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal { Task disposeTask; - Cancellation?.Dispose(); - await StateLock.WaitAsync(); try { @@ -206,6 +204,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal finally { StateLock.Release(); + + Cancellation?.Dispose(); + + Cancellation = null; } await disposeTask; @@ -218,78 +220,65 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal // Closing gracefully means we're only going to close the finished sides of the pipe // If the application finishes, that means it's done with the transport pipe // If the transport finishes, that means it's done with the application pipe - if (closeGracefully) + if (!closeGracefully) { - // Wait for either to finish - var result = await Task.WhenAny(applicationTask, transportTask); + Application?.Output.CancelPendingFlush(); - // If the application is complete, complete the transport pipe (it's the pipe to the transport) - if (result == applicationTask) + if (TransportType == HttpTransportType.WebSockets) { - Transport?.Output.Complete(applicationTask.Exception?.InnerException); - Transport?.Input.Complete(); - - try - { - Log.WaitingForTransport(_logger, TransportType); - - // Transports are written by us and are well behaved, wait for them to drain - await transportTask; - } - finally - { - Log.TransportComplete(_logger, TransportType); - - // Now complete the application - Application?.Output.Complete(); - Application?.Input.Complete(); - } + // The websocket transport will close the application output automatically when reading is cancelled + Cancellation?.Cancel(); } else { - // If the transport is complete, complete the application pipes - Application?.Output.Complete(transportTask.Exception?.InnerException); + // The other transports don't close their own output, so we can do it here safely + Application?.Output.Complete(); + } + } + + // Wait for either to finish + var result = await Task.WhenAny(applicationTask, transportTask); + + // If the application is complete, complete the transport pipe (it's the pipe to the transport) + if (result == applicationTask) + { + Transport?.Output.Complete(applicationTask.Exception?.InnerException); + Transport?.Input.Complete(); + + try + { + Log.WaitingForTransport(_logger, TransportType); + + // Transports are written by us and are well behaved, wait for them to drain + await transportTask; + } + finally + { + Log.TransportComplete(_logger, TransportType); + + // Now complete the application + Application?.Output.Complete(); Application?.Input.Complete(); - - try - { - // A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak - Log.WaitingForApplication(_logger); - - await applicationTask; - } - finally - { - Log.ApplicationComplete(_logger); - - Transport?.Output.Complete(); - Transport?.Input.Complete(); - } } } else { - Log.ShuttingDownTransportAndApplication(_logger, TransportType); - - // Cancel any pending flushes from back pressure - Application?.Output.CancelPendingFlush(); - - // Shutdown both sides and wait for nothing - Transport?.Output.Complete(applicationTask.Exception?.InnerException); + // If the transport is complete, complete the application pipes Application?.Output.Complete(transportTask.Exception?.InnerException); + Application?.Input.Complete(); try { - Log.WaitingForTransportAndApplication(_logger, TransportType); // A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak - await Task.WhenAll(applicationTask, transportTask); + Log.WaitingForApplication(_logger); + + await applicationTask; } finally { - Log.TransportAndApplicationComplete(_logger, TransportType); + Log.ApplicationComplete(_logger); - // Close the reading side after both sides run - Application?.Input.Complete(); + Transport?.Output.Complete(); Transport?.Input.Complete(); } } diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs index e13b532e16..d66c8ae8b3 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs @@ -166,6 +166,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal Log.EstablishedConnection(_logger); + // Allow the reads to be cancelled + connection.Cancellation = new CancellationTokenSource(); + var ws = new WebSocketsTransport(options.WebSockets, connection.Application, connection, _loggerFactory); await DoPersistentConnection(connectionDelegate, ws, context, connection); diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/Transports/WebSocketsTransport.cs index 0312f5488b..f7700701e9 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/Transports/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/Transports/WebSocketsTransport.cs @@ -138,13 +138,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports private async Task StartReceiving(WebSocket socket) { + var token = _connection.Cancellation?.Token ?? default; + try { - while (true) + while (!token.IsCancellationRequested) { #if NETCOREAPP2_2 // Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read - var result = await socket.ReceiveAsync(Memory.Empty, CancellationToken.None); + var result = await socket.ReceiveAsync(Memory.Empty, token); if (result.MessageType == WebSocketMessageType.Close) { @@ -154,15 +156,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports var memory = _application.Output.GetMemory(); #if NETCOREAPP2_2 - var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None); + var receiveResult = await socket.ReceiveAsync(memory, token); #else var isArray = MemoryMarshal.TryGetArray(memory, out var arraySegment); Debug.Assert(isArray); // Exceptions are handled above where the send and receive tasks are being run. - var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None); + var receiveResult = await socket.ReceiveAsync(arraySegment, token); #endif - // Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read + // Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read if (receiveResult.MessageType == WebSocketMessageType.Close) { return; @@ -193,7 +195,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports } catch (Exception ex) { - if (!_aborted) + if (!_aborted && !token.IsCancellationRequested) { _application.Output.Complete(ex); diff --git a/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionManagerTests.cs b/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionManagerTests.cs index 820d28749d..238cbe75ad 100644 --- a/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionManagerTests.cs +++ b/test/Microsoft.AspNetCore.Http.Connections.Tests/HttpConnectionManagerTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.IO.Pipelines; using System.Threading.Tasks; using Microsoft.AspNetCore.Hosting; @@ -96,9 +97,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests try { - await connection.DisposeAsync(closeGracefully); + await connection.DisposeAsync(closeGracefully).OrTimeout(); } - catch + catch (Exception ex) when (!(ex is TimeoutException)) { // Ignore the exception that bubbles out of the failing task } @@ -166,6 +167,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests try { Assert.True(result.IsCompleted); + + // We should be able to write + await connection.Transport.Output.WriteAsync(new byte[] { 1 }); } finally { @@ -176,6 +180,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests connection.TransportTask = Task.Run(async () => { var result = await connection.Application.Input.ReadAsync(); + Assert.Equal(new byte[] { 1 }, result.Buffer.ToArray()); + connection.Application.Input.AdvanceTo(result.Buffer.End); + + result = await connection.Application.Input.ReadAsync(); try { Assert.True(result.IsCompleted);