Merge pull request #2755 from dotnet-maestro-bot/merge/release/2.2-to-master

[automated] Merge branch 'release/2.2' => 'master'
This commit is contained in:
Andrew Stanton-Nurse 2018-08-08 08:17:40 -07:00 committed by GitHub
commit 556a234c45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 62 deletions

View File

@ -182,8 +182,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
{
Task disposeTask;
Cancellation?.Dispose();
await StateLock.WaitAsync();
try
{
@ -206,6 +204,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
finally
{
StateLock.Release();
Cancellation?.Dispose();
Cancellation = null;
}
await disposeTask;
@ -218,78 +220,65 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
// Closing gracefully means we're only going to close the finished sides of the pipe
// If the application finishes, that means it's done with the transport pipe
// If the transport finishes, that means it's done with the application pipe
if (closeGracefully)
if (!closeGracefully)
{
// Wait for either to finish
var result = await Task.WhenAny(applicationTask, transportTask);
Application?.Output.CancelPendingFlush();
// If the application is complete, complete the transport pipe (it's the pipe to the transport)
if (result == applicationTask)
if (TransportType == HttpTransportType.WebSockets)
{
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
Transport?.Input.Complete();
try
{
Log.WaitingForTransport(_logger, TransportType);
// Transports are written by us and are well behaved, wait for them to drain
await transportTask;
}
finally
{
Log.TransportComplete(_logger, TransportType);
// Now complete the application
Application?.Output.Complete();
Application?.Input.Complete();
}
// The websocket transport will close the application output automatically when reading is cancelled
Cancellation?.Cancel();
}
else
{
// If the transport is complete, complete the application pipes
Application?.Output.Complete(transportTask.Exception?.InnerException);
// The other transports don't close their own output, so we can do it here safely
Application?.Output.Complete();
}
}
// Wait for either to finish
var result = await Task.WhenAny(applicationTask, transportTask);
// If the application is complete, complete the transport pipe (it's the pipe to the transport)
if (result == applicationTask)
{
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
Transport?.Input.Complete();
try
{
Log.WaitingForTransport(_logger, TransportType);
// Transports are written by us and are well behaved, wait for them to drain
await transportTask;
}
finally
{
Log.TransportComplete(_logger, TransportType);
// Now complete the application
Application?.Output.Complete();
Application?.Input.Complete();
try
{
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
Log.WaitingForApplication(_logger);
await applicationTask;
}
finally
{
Log.ApplicationComplete(_logger);
Transport?.Output.Complete();
Transport?.Input.Complete();
}
}
}
else
{
Log.ShuttingDownTransportAndApplication(_logger, TransportType);
// Cancel any pending flushes from back pressure
Application?.Output.CancelPendingFlush();
// Shutdown both sides and wait for nothing
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
// If the transport is complete, complete the application pipes
Application?.Output.Complete(transportTask.Exception?.InnerException);
Application?.Input.Complete();
try
{
Log.WaitingForTransportAndApplication(_logger, TransportType);
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
await Task.WhenAll(applicationTask, transportTask);
Log.WaitingForApplication(_logger);
await applicationTask;
}
finally
{
Log.TransportAndApplicationComplete(_logger, TransportType);
Log.ApplicationComplete(_logger);
// Close the reading side after both sides run
Application?.Input.Complete();
Transport?.Output.Complete();
Transport?.Input.Complete();
}
}

View File

@ -166,6 +166,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
Log.EstablishedConnection(_logger);
// Allow the reads to be cancelled
connection.Cancellation = new CancellationTokenSource();
var ws = new WebSocketsTransport(options.WebSockets, connection.Application, connection, _loggerFactory);
await DoPersistentConnection(connectionDelegate, ws, context, connection);

View File

@ -138,13 +138,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
private async Task StartReceiving(WebSocket socket)
{
var token = _connection.Cancellation?.Token ?? default;
try
{
while (true)
while (!token.IsCancellationRequested)
{
#if NETCOREAPP2_2
// Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read
var result = await socket.ReceiveAsync(Memory<byte>.Empty, CancellationToken.None);
var result = await socket.ReceiveAsync(Memory<byte>.Empty, token);
if (result.MessageType == WebSocketMessageType.Close)
{
@ -154,15 +156,15 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
var memory = _application.Output.GetMemory();
#if NETCOREAPP2_2
var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None);
var receiveResult = await socket.ReceiveAsync(memory, token);
#else
var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment);
Debug.Assert(isArray);
// Exceptions are handled above where the send and receive tasks are being run.
var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None);
var receiveResult = await socket.ReceiveAsync(arraySegment, token);
#endif
// Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read
// Need to check again for NetCoreApp2.2 because a close can happen between a 0-byte read and the actual read
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
@ -193,7 +195,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
}
catch (Exception ex)
{
if (!_aborted)
if (!_aborted && !token.IsCancellationRequested)
{
_application.Output.Complete(ex);

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
@ -96,9 +97,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
try
{
await connection.DisposeAsync(closeGracefully);
await connection.DisposeAsync(closeGracefully).OrTimeout();
}
catch
catch (Exception ex) when (!(ex is TimeoutException))
{
// Ignore the exception that bubbles out of the failing task
}
@ -166,6 +167,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
try
{
Assert.True(result.IsCompleted);
// We should be able to write
await connection.Transport.Output.WriteAsync(new byte[] { 1 });
}
finally
{
@ -176,6 +180,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Tests
connection.TransportTask = Task.Run(async () =>
{
var result = await connection.Application.Input.ReadAsync();
Assert.Equal(new byte[] { 1 }, result.Buffer.ToArray());
connection.Application.Input.AdvanceTo(result.Buffer.End);
result = await connection.Application.Input.ReadAsync();
try
{
Assert.True(result.IsCompleted);