Allow processing of incoming invocations in parallel (#143)

* Allow processing of other incoming invocations in parallel
- Don't wait on the response of an invocation to pick up
the next message from the channel.
- Unhandled exceptions should continue bubbling up correctly
This commit is contained in:
David Fowler 2017-01-24 20:46:17 +00:00 committed by GitHub
parent 354daa2ded
commit 1e957a9e5a
1 changed files with 92 additions and 47 deletions

View File

@ -7,6 +7,7 @@ using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
@ -143,65 +144,109 @@ namespace Microsoft.AspNetCore.SignalR
{
var invocationAdapter = _registry.GetInvocationAdapter(connection.Metadata.Get<string>("formatType"));
while (await connection.Transport.Input.WaitToReadAsync())
// We use these for error handling. Since we dispatch multiple hub invocations
// in parallel, we need a way to communicate failure back to the main processing loop. The
// cancellation token is used to stop reading from the channel, the tcs
// is used to get the exception so we can bubble it up the stack
var cts = new CancellationTokenSource();
var tcs = new TaskCompletionSource<object>();
try
{
Message incomingMessage;
while (connection.Transport.Input.TryRead(out incomingMessage))
while (await connection.Transport.Input.WaitToReadAsync(cts.Token))
{
InvocationDescriptor invocationDescriptor;
using (incomingMessage)
Message incomingMessage;
while (connection.Transport.Input.TryRead(out incomingMessage))
{
var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray());
// TODO: Handle receiving InvocationResultDescriptor
invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor;
}
// Is there a better way of detecting that a connection was closed?
if (invocationDescriptor == null)
{
break;
}
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor);
}
InvocationResultDescriptor result;
Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>> callback;
if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback))
{
result = await callback(connection, invocationDescriptor);
}
else
{
// If there's no method then return a failed response for this request
result = new InvocationResultDescriptor
InvocationDescriptor invocationDescriptor;
using (incomingMessage)
{
Id = invocationDescriptor.Id,
Error = $"Unknown hub method '{invocationDescriptor.Method}'"
};
var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray());
_logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method);
}
// TODO: Handle receiving InvocationResultDescriptor
invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor;
}
// TODO: Pool memory
var outStream = new MemoryStream();
await invocationAdapter.WriteMessageAsync(result, outStream);
var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve();
var outMessage = new Message(buffer, Format.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{
if (connection.Transport.Output.TryWrite(outMessage))
// Is there a better way of detecting that a connection was closed?
if (invocationDescriptor == null)
{
break;
}
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor);
}
// Don't wait on the result of execution, continue processing other
// incoming messages on this connection.
var ignore = ProcessInvocation(connection, invocationAdapter, invocationDescriptor, cts, tcs);
}
}
}
catch (OperationCanceledException)
{
// Await the task so the exception bubbles up to the caller
await tcs.Task;
}
}
private async Task ProcessInvocation(Connection connection,
IInvocationAdapter invocationAdapter,
InvocationDescriptor invocationDescriptor,
CancellationTokenSource cts,
TaskCompletionSource<object> tcs)
{
try
{
// If an unexpected exception occurs then we want to kill the entire connection
// by ending the processing loop
await Execute(connection, invocationAdapter, invocationDescriptor);
}
catch (Exception ex)
{
// Set the exception on the task completion source
tcs.TrySetException(ex);
// Cancel reading operation
cts.Cancel();
}
}
private async Task Execute(Connection connection, IInvocationAdapter invocationAdapter, InvocationDescriptor invocationDescriptor)
{
InvocationResultDescriptor result;
Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>> callback;
if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback))
{
result = await callback(connection, invocationDescriptor);
}
else
{
// If there's no method then return a failed response for this request
result = new InvocationResultDescriptor
{
Id = invocationDescriptor.Id,
Error = $"Unknown hub method '{invocationDescriptor.Method}'"
};
_logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method);
}
// TODO: Pool memory
var outStream = new MemoryStream();
await invocationAdapter.WriteMessageAsync(result, outStream);
var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve();
var outMessage = new Message(buffer, Format.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{
if (connection.Transport.Output.TryWrite(outMessage))
{
break;
}
}
}
private void InitializeHub(THub hub, Connection connection)