From baf7fce49d444f4ffba8a98075182ef51c9b824c Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 10 Jan 2017 00:25:55 -0800 Subject: [PATCH] Made some fixes to the disconnect sequence - Fixed ChannelConnection to use IChannel for both sides of the connection. This allows use to close both the input and the output when we are tearing down. - Use TryComplete instead of complete to avoid exceptions thrown on Complete(), particularly ChannelClosedException. --- .../HttpConnectionDispatcher.cs | 22 ++++--------------- .../Internal/ChannelConnection.cs | 15 ++++++++----- .../Transports/LongPollingTransport.cs | 11 ++++------ .../Transports/ServerSentEventsTransport.cs | 6 ++++- .../Transports/WebSocketsTransport.cs | 3 ++- 5 files changed, 24 insertions(+), 33 deletions(-) diff --git a/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs index 5e6693942f..c83dac4851 100644 --- a/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs @@ -93,7 +93,6 @@ namespace Microsoft.AspNetCore.Sockets state.Active = true; var longPolling = new LongPollingTransport(application.Input, _loggerFactory); - RegisterLongPollingDisconnect(context, longPolling); // Start the transport var transportTask = longPolling.ProcessRequestAsync(context); @@ -110,6 +109,9 @@ namespace Microsoft.AspNetCore.Sockets // REVIEW: This is super gross, this all needs to be cleaned up... state.Close = async () => { + // Close the end point's connection + state.Connection.Dispose(); + try { await endpointTask; @@ -118,8 +120,6 @@ namespace Microsoft.AspNetCore.Sockets { // possibly invoked on a ThreadPool thread } - - state.Connection.Dispose(); }; endpointTask = endpoint.OnConnectedAsync(state.Connection); @@ -139,6 +139,7 @@ namespace Microsoft.AspNetCore.Sockets { state.TerminateTransport(endpointTask.Exception.InnerException); } + state.Connection.Dispose(); await transportTask; @@ -180,9 +181,6 @@ namespace Microsoft.AspNetCore.Sockets HttpContext context, ConnectionState state) { - // Register this transport for disconnect - RegisterDisconnect(context, state); - // Start the transport var transportTask = transport.ProcessRequestAsync(context); @@ -199,18 +197,6 @@ namespace Microsoft.AspNetCore.Sockets await Task.WhenAll(endpointTask, transportTask); } - private static void RegisterLongPollingDisconnect(HttpContext context, LongPollingTransport transport) - { - // For long polling, we need to end the transport but not the overall connection so we write 0 bytes - context.RequestAborted.Register(state => ((LongPollingTransport)state).Cancel(), transport); - } - - private static void RegisterDisconnect(HttpContext context, ConnectionState connectionState) - { - // We just kill the output writing as a signal to the transport that it is done - context.RequestAborted.Register(state => ((ConnectionState)state).Dispose(), connectionState); - } - private Task ProcessGetId(HttpContext context, ConnectionMode mode) { // Establish the connection diff --git a/src/Microsoft.AspNetCore.Sockets/Internal/ChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets/Internal/ChannelConnection.cs index c6f989c5c9..facc91a2c5 100644 --- a/src/Microsoft.AspNetCore.Sockets/Internal/ChannelConnection.cs +++ b/src/Microsoft.AspNetCore.Sockets/Internal/ChannelConnection.cs @@ -11,10 +11,14 @@ namespace Microsoft.AspNetCore.Sockets.Internal { public class ChannelConnection : IChannelConnection { - public IReadableChannel Input { get; } - public IWritableChannel Output { get; } + public IChannel Input { get; } + public IChannel Output { get; } - public ChannelConnection(IReadableChannel input, IWritableChannel output) + IReadableChannel IChannelConnection.Input => Input; + + IWritableChannel IChannelConnection.Output => Output; + + public ChannelConnection(IChannel input, IChannel output) { Input = input; Output = output; @@ -22,9 +26,8 @@ namespace Microsoft.AspNetCore.Sockets.Internal public void Dispose() { - Output.Complete(); - (Input as IDisposable)?.Dispose(); - (Output as IDisposable)?.Dispose(); + Output.TryComplete(); + Input.TryComplete(); } } } diff --git a/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs index fc05d1b0a0..db2c7dc7c8 100644 --- a/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets/Transports/LongPollingTransport.cs @@ -14,7 +14,6 @@ namespace Microsoft.AspNetCore.Sockets.Transports public class LongPollingTransport : IHttpTransport { private readonly IReadableChannel _connection; - private CancellationTokenSource _cancellationSource = new CancellationTokenSource(); private readonly ILogger _logger; public LongPollingTransport(IReadableChannel connection, ILoggerFactory loggerFactory) @@ -35,7 +34,10 @@ namespace Microsoft.AspNetCore.Sockets.Transports try { - using (var message = await _connection.ReadAsync(_cancellationSource.Token)) + // TODO: We need the ability to yield the connection without completing the channel. + // This is to force ReadAsync to yield without data to end to poll but not the entire connection. + // This is for cases when the client reconnects see issue #27 + using (var message = await _connection.ReadAsync(context.RequestAborted)) { _logger.LogDebug("Writing {0} byte message to response", message.Payload.Buffer.Length); context.Response.ContentLength = message.Payload.Buffer.Length; @@ -58,10 +60,5 @@ namespace Microsoft.AspNetCore.Sockets.Transports throw; } } - - public void Cancel() - { - _cancellationSource.Cancel(); - } } } diff --git a/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs index 8a1b05ffe3..e69744d0d1 100644 --- a/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets/Transports/ServerSentEventsTransport.cs @@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports { while (true) { - using (var message = await _application.ReadAsync()) + using (var message = await _application.ReadAsync(context.RequestAborted)) { await Send(context, message); } @@ -42,6 +42,10 @@ namespace Microsoft.AspNetCore.Sockets.Transports { // Gross that we have to catch this this way. See https://github.com/dotnet/corefxlab/issues/1068 } + catch (OperationCanceledException) + { + // Closed connection + } } private async Task Send(HttpContext context, Message message) diff --git a/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs index bd5836174e..74fd027f94 100644 --- a/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets/Transports/WebSocketsTransport.cs @@ -152,7 +152,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports private async Task StartSending(IWebSocketConnection ws) { - while (!_connection.Input.Completion.IsCompleted) + while (true) { // Get a frame from the application try @@ -188,6 +188,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports catch (Exception ex) when (ex.GetType().IsNested && ex.GetType().DeclaringType == typeof(Channel)) { // Gross that we have to catch this this way. See https://github.com/dotnet/corefxlab/issues/1068 + break; } } }