Made some fixes to the disconnect sequence

- Fixed ChannelConnection to use IChannel<T> for
both sides of the connection. This allows use to close both the
input and the output when we are tearing down.
- Use TryComplete instead of complete to avoid exceptions thrown on
Complete(), particularly ChannelClosedException.
This commit is contained in:
David Fowler 2017-01-10 00:25:55 -08:00
parent a9dfd83ff4
commit baf7fce49d
5 changed files with 24 additions and 33 deletions

View File

@ -93,7 +93,6 @@ namespace Microsoft.AspNetCore.Sockets
state.Active = true;
var longPolling = new LongPollingTransport(application.Input, _loggerFactory);
RegisterLongPollingDisconnect(context, longPolling);
// Start the transport
var transportTask = longPolling.ProcessRequestAsync(context);
@ -110,6 +109,9 @@ namespace Microsoft.AspNetCore.Sockets
// REVIEW: This is super gross, this all needs to be cleaned up...
state.Close = async () =>
{
// Close the end point's connection
state.Connection.Dispose();
try
{
await endpointTask;
@ -118,8 +120,6 @@ namespace Microsoft.AspNetCore.Sockets
{
// possibly invoked on a ThreadPool thread
}
state.Connection.Dispose();
};
endpointTask = endpoint.OnConnectedAsync(state.Connection);
@ -139,6 +139,7 @@ namespace Microsoft.AspNetCore.Sockets
{
state.TerminateTransport(endpointTask.Exception.InnerException);
}
state.Connection.Dispose();
await transportTask;
@ -180,9 +181,6 @@ namespace Microsoft.AspNetCore.Sockets
HttpContext context,
ConnectionState state)
{
// Register this transport for disconnect
RegisterDisconnect(context, state);
// Start the transport
var transportTask = transport.ProcessRequestAsync(context);
@ -199,18 +197,6 @@ namespace Microsoft.AspNetCore.Sockets
await Task.WhenAll(endpointTask, transportTask);
}
private static void RegisterLongPollingDisconnect(HttpContext context, LongPollingTransport transport)
{
// For long polling, we need to end the transport but not the overall connection so we write 0 bytes
context.RequestAborted.Register(state => ((LongPollingTransport)state).Cancel(), transport);
}
private static void RegisterDisconnect(HttpContext context, ConnectionState connectionState)
{
// We just kill the output writing as a signal to the transport that it is done
context.RequestAborted.Register(state => ((ConnectionState)state).Dispose(), connectionState);
}
private Task ProcessGetId(HttpContext context, ConnectionMode mode)
{
// Establish the connection

View File

@ -11,10 +11,14 @@ namespace Microsoft.AspNetCore.Sockets.Internal
{
public class ChannelConnection<T> : IChannelConnection<T>
{
public IReadableChannel<T> Input { get; }
public IWritableChannel<T> Output { get; }
public IChannel<T> Input { get; }
public IChannel<T> Output { get; }
public ChannelConnection(IReadableChannel<T> input, IWritableChannel<T> output)
IReadableChannel<T> IChannelConnection<T>.Input => Input;
IWritableChannel<T> IChannelConnection<T>.Output => Output;
public ChannelConnection(IChannel<T> input, IChannel<T> output)
{
Input = input;
Output = output;
@ -22,9 +26,8 @@ namespace Microsoft.AspNetCore.Sockets.Internal
public void Dispose()
{
Output.Complete();
(Input as IDisposable)?.Dispose();
(Output as IDisposable)?.Dispose();
Output.TryComplete();
Input.TryComplete();
}
}
}

View File

@ -14,7 +14,6 @@ namespace Microsoft.AspNetCore.Sockets.Transports
public class LongPollingTransport : IHttpTransport
{
private readonly IReadableChannel<Message> _connection;
private CancellationTokenSource _cancellationSource = new CancellationTokenSource();
private readonly ILogger _logger;
public LongPollingTransport(IReadableChannel<Message> connection, ILoggerFactory loggerFactory)
@ -35,7 +34,10 @@ namespace Microsoft.AspNetCore.Sockets.Transports
try
{
using (var message = await _connection.ReadAsync(_cancellationSource.Token))
// TODO: We need the ability to yield the connection without completing the channel.
// This is to force ReadAsync to yield without data to end to poll but not the entire connection.
// This is for cases when the client reconnects see issue #27
using (var message = await _connection.ReadAsync(context.RequestAborted))
{
_logger.LogDebug("Writing {0} byte message to response", message.Payload.Buffer.Length);
context.Response.ContentLength = message.Payload.Buffer.Length;
@ -58,10 +60,5 @@ namespace Microsoft.AspNetCore.Sockets.Transports
throw;
}
}
public void Cancel()
{
_cancellationSource.Cancel();
}
}
}

View File

@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
while (true)
{
using (var message = await _application.ReadAsync())
using (var message = await _application.ReadAsync(context.RequestAborted))
{
await Send(context, message);
}
@ -42,6 +42,10 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
// Gross that we have to catch this this way. See https://github.com/dotnet/corefxlab/issues/1068
}
catch (OperationCanceledException)
{
// Closed connection
}
}
private async Task Send(HttpContext context, Message message)

View File

@ -152,7 +152,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
private async Task StartSending(IWebSocketConnection ws)
{
while (!_connection.Input.Completion.IsCompleted)
while (true)
{
// Get a frame from the application
try
@ -188,6 +188,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
catch (Exception ex) when (ex.GetType().IsNested && ex.GetType().DeclaringType == typeof(Channel))
{
// Gross that we have to catch this this way. See https://github.com/dotnet/corefxlab/issues/1068
break;
}
}
}