Make un-graceful shutdown a bit more graceful (#2748)

* Make un-graceful shutdown a bit more graceful
- Change how graceful shutdown is done to ensure there are no errors on shutdown.
- Avoid closing pipes from under other components that own those pipes.
- Close the application output so that the application's read loop ends gracefully. For websockets, we cancel the token, for other transports, we close the output directly.
This commit is contained in:
David Fowler 2018-08-07 21:39:09 -07:00 committed by GitHub
parent fddf341913
commit 5e7f63b096
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 62 deletions

View File

@ -182,8 +182,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
{
Task disposeTask;
Cancellation?.Dispose();
await StateLock.WaitAsync();
try
{
@ -206,6 +204,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
finally
{
StateLock.Release();
Cancellation?.Dispose();
Cancellation = null;
}
await disposeTask;
@ -218,78 +220,65 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
// Closing gracefully means we're only going to close the finished sides of the pipe
// If the application finishes, that means it's done with the transport pipe
// If the transport finishes, that means it's done with the application pipe
if (closeGracefully)
if (!closeGracefully)
{
// Wait for either to finish
var result = await Task.WhenAny(applicationTask, transportTask);
Application?.Output.CancelPendingFlush();
// If the application is complete, complete the transport pipe (it's the pipe to the transport)
if (result == applicationTask)
if (TransportType == HttpTransportType.WebSockets)
{
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
Transport?.Input.Complete();
try
{
Log.WaitingForTransport(_logger, TransportType);
// Transports are written by us and are well behaved, wait for them to drain
await transportTask;
}
finally
{
Log.TransportComplete(_logger, TransportType);
// Now complete the application
Application?.Output.Complete();
Application?.Input.Complete();
}
// The websocket transport will close the application output automatically when reading is cancelled
Cancellation?.Cancel();
}
else
{
// If the transport is complete, complete the application pipes
Application?.Output.Complete(transportTask.Exception?.InnerException);
// The other transports don't close their own output, so we can do it here safely
Application?.Output.Complete();
}
}
// Wait for either to finish
var result = await Task.WhenAny(applicationTask, transportTask);
// If the application is complete, complete the transport pipe (it's the pipe to the transport)
if (result == applicationTask)
{
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
Transport?.Input.Complete();
try
{
Log.WaitingForTransport(_logger, TransportType);
// Transports are written by us and are well behaved, wait for them to drain
await transportTask;
}
finally
{
Log.TransportComplete(_logger, TransportType);
// Now complete the application
Application?.Output.Complete();
Application?.Input.Complete();
try
{
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
Log.WaitingForApplication(_logger);
await applicationTask;
}
finally
{
Log.ApplicationComplete(_logger);
Transport?.Output.Complete();
Transport?.Input.Complete();
}
}
}
else
{
Log.ShuttingDownTransportAndApplication(_logger, TransportType);
// Cancel any pending flushes from back pressure
Application?.Output.CancelPendingFlush();
// Shutdown both sides and wait for nothing
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
// If the transport is complete, complete the application pipes
Application?.Output.Complete(transportTask.Exception?.InnerException);
Application?.Input.Complete();
try
{
Log.WaitingForTransportAndApplication(_logger, TransportType);
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
await Task.WhenAll(applicationTask, transportTask);
Log.WaitingForApplication(_logger);
await applicationTask;
}
finally
{
Log.TransportAndApplicationComplete(_logger, TransportType);
Log.ApplicationComplete(_logger);
// Close the reading side after both sides run
Application?.Input.Complete();
Transport?.Output.Complete();
Transport?.Input.Complete();
}
}

View File

@ -166,6 +166,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
Log.EstablishedConnection(_logger);
// Allow the reads to be cancelled
connection.Cancellation = new CancellationTokenSource();
var ws = new WebSocketsTransport(options.WebSockets, connection.Application, connection, _loggerFactory);
await DoPersistentConnection(connectionDelegate, ws, context, connection);

View File

@ -138,13 +138,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
private async Task StartReceiving(WebSocket socket)
{
var token = _connection.Cancellation?.Token ?? default;
try
{
while (true)
while (!token.IsCancellationRequested)
{
#if NETCOREAPP2_2
// Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read
var result = await socket.ReceiveAsync(Memory<byte>.Empty, CancellationToken.None);
var result = await socket.ReceiveAsync(Memory<byte>.Empty, token);
if (result.MessageType == WebSocketMessageType.Close)
{
@ -154,15 +156,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
var memory = _application.Output.GetMemory();
#if NETCOREAPP2_2
var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None);
var receiveResult = await socket.ReceiveAsync(memory, token);
#else
var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment);
Debug.Assert(isArray);
// Exceptions are handled above where the send and receive tasks are being run.
var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None);
var receiveResult = await socket.ReceiveAsync(arraySegment, token);
#endif
// Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read
// Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
@ -193,7 +195,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
}
catch (Exception ex)
{
if (!_aborted)
if (!_aborted && !token.IsCancellationRequested)
{
_application.Output.Complete(ex);

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
@ -96,9 +97,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
try
{
await connection.DisposeAsync(closeGracefully);
await connection.DisposeAsync(closeGracefully).OrTimeout();
}
catch
catch (Exception ex) when (!(ex is TimeoutException))
{
// Ignore the exception that bubbles out of the failing task
}
@ -166,6 +167,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
try
{
Assert.True(result.IsCompleted);
// We should be able to write
await connection.Transport.Output.WriteAsync(new byte[] { 1 });
}
finally
{
@ -176,6 +180,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
connection.TransportTask = Task.Run(async () =>
{
var result = await connection.Application.Input.ReadAsync();
Assert.Equal(new byte[] { 1 }, result.Buffer.ToArray());
connection.Application.Input.AdvanceTo(result.Buffer.End);
result = await connection.Application.Input.ReadAsync();
try
{
Assert.True(result.IsCompleted);