Simplify error handling in HubConnectionHandler (#2162)

* Simplify error handling in HubConnectionHandler
- Since we execute hub methods inline, there's no need to abort the connection on unexpected exceptions.
- Don't pass the cancellation token to ReadAsync, instead just use CancelPendingRead.
- Don't treat OperationCancelledException errors as errors.
This commit is contained in:
David Fowler 2018-04-29 22:51:16 -07:00 committed by GitHub
parent f89400cf12
commit ab451b53b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 54 deletions

View File

@ -59,8 +59,6 @@ namespace Microsoft.AspNetCore.SignalR
public virtual IHubProtocol Protocol { get; internal set; }
internal ExceptionDispatchInfo AbortException { get; private set; }
// Currently used only for streaming methods
internal ConcurrentDictionary<string, CancellationTokenSource> ActiveRequestCancellationSources { get; } = new ConcurrentDictionary<string, CancellationTokenSource>(StringComparer.Ordinal);
@ -270,6 +268,8 @@ namespace Microsoft.AspNetCore.SignalR
return;
}
Input.CancelPendingRead();
// We fire and forget since this can trigger user code to run
Task.Factory.StartNew(_abortedCallback, this);
}
@ -295,6 +295,12 @@ namespace Microsoft.AspNetCore.SignalR
try
{
if (result.IsCanceled)
{
Log.HandshakeCanceled(_logger);
return false;
}
if (!buffer.IsEmpty)
{
if (HandshakeProtocol.TryParseRequestMessage(ref buffer, out var handshakeRequestMessage))
@ -386,12 +392,6 @@ namespace Microsoft.AspNetCore.SignalR
}
}
internal void Abort(Exception exception)
{
AbortException = ExceptionDispatchInfo.Capture(exception);
Abort();
}
// Used by the HubConnectionHandler only
internal Task AbortAsync()
{

View File

@ -100,6 +100,11 @@ namespace Microsoft.AspNetCore.SignalR
{
await DispatchMessagesAsync(connection);
}
catch (OperationCanceledException)
{
// Don't treat OperationCanceledException as an error, it's basically a "control flow"
// exception to stop things from running
}
catch (Exception ex)
{
Log.ErrorProcessingRequest(_logger, ex);
@ -164,44 +169,39 @@ namespace Microsoft.AspNetCore.SignalR
private async Task DispatchMessagesAsync(HubConnectionContext connection)
{
try
var input = connection.Input;
var protocol = connection.Protocol;
while (true)
{
var input = connection.Input;
var protocol = connection.Protocol;
while (true)
{
var result = await input.ReadAsync(connection.ConnectionAborted);
var buffer = result.Buffer;
var result = await input.ReadAsync();
var buffer = result.Buffer;
try
try
{
if (result.IsCanceled)
{
if (!buffer.IsEmpty)
break;
}
if (!buffer.IsEmpty)
{
while (protocol.TryParseMessage(ref buffer, _dispatcher, out var message))
{
while (protocol.TryParseMessage(ref buffer, _dispatcher, out var message))
{
// Messages are dispatched sequentially and will block other messages from being processed until they complete.
// Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run.
await _dispatcher.DispatchMessageAsync(connection, message);
}
}
else if (result.IsCompleted)
{
break;
await _dispatcher.DispatchMessageAsync(connection, message);
}
}
finally
else if (result.IsCompleted)
{
// The buffer was sliced up to where it was consumed, so we can just advance to the start.
// We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again.
input.AdvanceTo(buffer.Start, buffer.End);
break;
}
}
}
catch (OperationCanceledException)
{
// If there's an exception, bubble it to the caller
connection.AbortException?.Throw();
finally
{
// The buffer was sliced up to where it was consumed, so we can just advance to the start.
// We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data
// before yielding the read again.
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}

View File

@ -75,6 +75,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
public override async Task DispatchMessageAsync(HubConnectionContext connection, HubMessage hubMessage)
{
// Messages are dispatched sequentially and will stop other messages from being processed until they complete.
// Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run.
switch (hubMessage)
{
case InvocationBindingFailureMessage bindingFailureMessage:
@ -142,26 +145,16 @@ namespace Microsoft.AspNetCore.SignalR.Internal
private async Task ProcessInvocation(HubConnectionContext connection,
HubMethodInvocationMessage hubMethodInvocationMessage, bool isStreamedInvocation)
{
try
if (!_methods.TryGetValue(hubMethodInvocationMessage.Target, out var descriptor))
{
// If an unexpected exception occurs then we want to kill the entire connection
// by ending the processing loop
if (!_methods.TryGetValue(hubMethodInvocationMessage.Target, out var descriptor))
{
// Send an error to the client. Then let the normal completion process occur
Log.UnknownHubMethod(_logger, hubMethodInvocationMessage.Target);
await connection.WriteAsync(CompletionMessage.WithError(
hubMethodInvocationMessage.InvocationId, $"Unknown hub method '{hubMethodInvocationMessage.Target}'"));
}
else
{
await Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamedInvocation);
}
// Send an error to the client. Then let the normal completion process occur
Log.UnknownHubMethod(_logger, hubMethodInvocationMessage.Target);
await connection.WriteAsync(CompletionMessage.WithError(
hubMethodInvocationMessage.InvocationId, $"Unknown hub method '{hubMethodInvocationMessage.Target}'"));
}
catch (Exception ex)
else
{
// Abort the entire connection if the invocation fails in an unexpected way
connection.Abort(ex);
await Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamedInvocation);
}
}