diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs index 711cdd4f97..a522f9a867 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs @@ -59,8 +59,6 @@ namespace Microsoft.AspNetCore.SignalR public virtual IHubProtocol Protocol { get; internal set; } - internal ExceptionDispatchInfo AbortException { get; private set; } - // Currently used only for streaming methods internal ConcurrentDictionary ActiveRequestCancellationSources { get; } = new ConcurrentDictionary(StringComparer.Ordinal); @@ -270,6 +268,8 @@ namespace Microsoft.AspNetCore.SignalR return; } + Input.CancelPendingRead(); + // We fire and forget since this can trigger user code to run Task.Factory.StartNew(_abortedCallback, this); } @@ -295,6 +295,12 @@ namespace Microsoft.AspNetCore.SignalR try { + if (result.IsCanceled) + { + Log.HandshakeCanceled(_logger); + return false; + } + if (!buffer.IsEmpty) { if (HandshakeProtocol.TryParseRequestMessage(ref buffer, out var handshakeRequestMessage)) @@ -386,12 +392,6 @@ namespace Microsoft.AspNetCore.SignalR } } - internal void Abort(Exception exception) - { - AbortException = ExceptionDispatchInfo.Capture(exception); - Abort(); - } - // Used by the HubConnectionHandler only internal Task AbortAsync() { diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs index 668016c25e..c48ea19472 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs @@ -100,6 +100,11 @@ namespace Microsoft.AspNetCore.SignalR { await DispatchMessagesAsync(connection); } + catch (OperationCanceledException) + { + // Don't treat OperationCanceledException as an error, it's basically a "control flow" + // exception to stop things from running + } catch (Exception ex) { Log.ErrorProcessingRequest(_logger, ex); @@ -164,44 +169,39 @@ namespace Microsoft.AspNetCore.SignalR private async Task DispatchMessagesAsync(HubConnectionContext connection) { - try + var input = connection.Input; + var protocol = connection.Protocol; + while (true) { - var input = connection.Input; - var protocol = connection.Protocol; - while (true) - { - var result = await input.ReadAsync(connection.ConnectionAborted); - var buffer = result.Buffer; + var result = await input.ReadAsync(); + var buffer = result.Buffer; - try + try + { + if (result.IsCanceled) { - if (!buffer.IsEmpty) + break; + } + + if (!buffer.IsEmpty) + { + while (protocol.TryParseMessage(ref buffer, _dispatcher, out var message)) { - while (protocol.TryParseMessage(ref buffer, _dispatcher, out var message)) - { - // Messages are dispatched sequentially and will block other messages from being processed until they complete. - // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. - await _dispatcher.DispatchMessageAsync(connection, message); - } - } - else if (result.IsCompleted) - { - break; + await _dispatcher.DispatchMessageAsync(connection, message); } } - finally + else if (result.IsCompleted) { - // The buffer was sliced up to where it was consumed, so we can just advance to the start. - // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data - // before yielding the read again. - input.AdvanceTo(buffer.Start, buffer.End); + break; } } - } - catch (OperationCanceledException) - { - // If there's an exception, bubble it to the caller - connection.AbortException?.Throw(); + finally + { + // The buffer was sliced up to where it was consumed, so we can just advance to the start. + // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data + // before yielding the read again. + input.AdvanceTo(buffer.Start, buffer.End); + } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs b/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs index 29bbbed579..ec60f108e5 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs @@ -75,6 +75,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal public override async Task DispatchMessageAsync(HubConnectionContext connection, HubMessage hubMessage) { + // Messages are dispatched sequentially and will stop other messages from being processed until they complete. + // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. + switch (hubMessage) { case InvocationBindingFailureMessage bindingFailureMessage: @@ -142,26 +145,16 @@ namespace Microsoft.AspNetCore.SignalR.Internal private async Task ProcessInvocation(HubConnectionContext connection, HubMethodInvocationMessage hubMethodInvocationMessage, bool isStreamedInvocation) { - try + if (!_methods.TryGetValue(hubMethodInvocationMessage.Target, out var descriptor)) { - // If an unexpected exception occurs then we want to kill the entire connection - // by ending the processing loop - if (!_methods.TryGetValue(hubMethodInvocationMessage.Target, out var descriptor)) - { - // Send an error to the client. Then let the normal completion process occur - Log.UnknownHubMethod(_logger, hubMethodInvocationMessage.Target); - await connection.WriteAsync(CompletionMessage.WithError( - hubMethodInvocationMessage.InvocationId, $"Unknown hub method '{hubMethodInvocationMessage.Target}'")); - } - else - { - await Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamedInvocation); - } + // Send an error to the client. Then let the normal completion process occur + Log.UnknownHubMethod(_logger, hubMethodInvocationMessage.Target); + await connection.WriteAsync(CompletionMessage.WithError( + hubMethodInvocationMessage.InvocationId, $"Unknown hub method '{hubMethodInvocationMessage.Target}'")); } - catch (Exception ex) + else { - // Abort the entire connection if the invocation fails in an unexpected way - connection.Abort(ex); + await Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamedInvocation); } }