From 1e957a9e5af0ea068da47ea84ab188b5b8d5e734 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 24 Jan 2017 20:46:17 +0000 Subject: [PATCH] Allow processing of incoming invocations in parallel (#143) * Allow processing of other incoming invocations in parallel - Don't wait on the response of an invocation to pick up the next message from the channel. - Unhandled exceptions should continue bubbling up correctly --- .../HubEndPoint.cs | 139 ++++++++++++------ 1 file changed, 92 insertions(+), 47 deletions(-) diff --git a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs index 163455c667..aeed94c55d 100644 --- a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs +++ b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs @@ -7,6 +7,7 @@ using System.IO; using System.IO.Pipelines; using System.Linq; using System.Reflection; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Sockets; using Microsoft.Extensions.DependencyInjection; @@ -143,65 +144,109 @@ namespace Microsoft.AspNetCore.SignalR { var invocationAdapter = _registry.GetInvocationAdapter(connection.Metadata.Get("formatType")); - while (await connection.Transport.Input.WaitToReadAsync()) + // We use these for error handling. Since we dispatch multiple hub invocations + // in parallel, we need a way to communicate failure back to the main processing loop. The + // cancellation token is used to stop reading from the channel, the tcs + // is used to get the exception so we can bubble it up the stack + var cts = new CancellationTokenSource(); + var tcs = new TaskCompletionSource(); + + try { - Message incomingMessage; - while (connection.Transport.Input.TryRead(out incomingMessage)) + while (await connection.Transport.Input.WaitToReadAsync(cts.Token)) { - InvocationDescriptor invocationDescriptor; - using (incomingMessage) + Message incomingMessage; + while (connection.Transport.Input.TryRead(out incomingMessage)) { - var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray()); - - // TODO: Handle receiving InvocationResultDescriptor - invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor; - } - - // Is there a better way of detecting that a connection was closed? - if (invocationDescriptor == null) - { - break; - } - - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor); - } - - InvocationResultDescriptor result; - Func> callback; - if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback)) - { - result = await callback(connection, invocationDescriptor); - } - else - { - // If there's no method then return a failed response for this request - result = new InvocationResultDescriptor + InvocationDescriptor invocationDescriptor; + using (incomingMessage) { - Id = invocationDescriptor.Id, - Error = $"Unknown hub method '{invocationDescriptor.Method}'" - }; + var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray()); - _logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method); - } + // TODO: Handle receiving InvocationResultDescriptor + invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor; + } - // TODO: Pool memory - var outStream = new MemoryStream(); - await invocationAdapter.WriteMessageAsync(result, outStream); - - var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve(); - var outMessage = new Message(buffer, Format.Text, endOfMessage: true); - - while (await connection.Transport.Output.WaitToWriteAsync()) - { - if (connection.Transport.Output.TryWrite(outMessage)) + // Is there a better way of detecting that a connection was closed? + if (invocationDescriptor == null) { break; } + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor); + } + + // Don't wait on the result of execution, continue processing other + // incoming messages on this connection. + var ignore = ProcessInvocation(connection, invocationAdapter, invocationDescriptor, cts, tcs); } } } + catch (OperationCanceledException) + { + // Await the task so the exception bubbles up to the caller + await tcs.Task; + } + } + + private async Task ProcessInvocation(Connection connection, + IInvocationAdapter invocationAdapter, + InvocationDescriptor invocationDescriptor, + CancellationTokenSource cts, + TaskCompletionSource tcs) + { + try + { + // If an unexpected exception occurs then we want to kill the entire connection + // by ending the processing loop + await Execute(connection, invocationAdapter, invocationDescriptor); + } + catch (Exception ex) + { + // Set the exception on the task completion source + tcs.TrySetException(ex); + + // Cancel reading operation + cts.Cancel(); + } + } + + private async Task Execute(Connection connection, IInvocationAdapter invocationAdapter, InvocationDescriptor invocationDescriptor) + { + InvocationResultDescriptor result; + Func> callback; + if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback)) + { + result = await callback(connection, invocationDescriptor); + } + else + { + // If there's no method then return a failed response for this request + result = new InvocationResultDescriptor + { + Id = invocationDescriptor.Id, + Error = $"Unknown hub method '{invocationDescriptor.Method}'" + }; + + _logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method); + } + + // TODO: Pool memory + var outStream = new MemoryStream(); + await invocationAdapter.WriteMessageAsync(result, outStream); + + var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve(); + var outMessage = new Message(buffer, Format.Text, endOfMessage: true); + + while (await connection.Transport.Output.WaitToWriteAsync()) + { + if (connection.Transport.Output.TryWrite(outMessage)) + { + break; + } + } } private void InitializeHub(THub hub, Connection connection)