Use TryRead and TryWrite (#113)

* Use TryRead and TryWrite
- Use TryWrite to avoid errors on channel close for /send requests
- Use TryRead until it returns false for all transports but long polling
This commit is contained in:
David Fowler 2017-01-11 12:27:18 -08:00 committed by GitHub
parent 5d374b7dbe
commit 8dc68cb798
5 changed files with 66 additions and 60 deletions

View File

@ -145,58 +145,61 @@ namespace Microsoft.AspNetCore.SignalR
while (await connection.Transport.Input.WaitToReadAsync())
{
Message message;
if (!connection.Transport.Input.TryRead(out message))
Message incomingMessage;
while (connection.Transport.Input.TryRead(out incomingMessage))
{
continue;
}
InvocationDescriptor invocationDescriptor;
using (message)
{
var inputStream = new MemoryStream(message.Payload.Buffer.ToArray());
// TODO: Handle receiving InvocationResultDescriptor
invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor;
}
// Is there a better way of detecting that a connection was closed?
if (invocationDescriptor == null)
{
break;
}
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor);
}
InvocationResultDescriptor result;
Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>> callback;
if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback))
{
result = await callback(connection, invocationDescriptor);
}
else
{
// If there's no method then return a failed response for this request
result = new InvocationResultDescriptor
InvocationDescriptor invocationDescriptor;
using (incomingMessage)
{
Id = invocationDescriptor.Id,
Error = $"Unknown hub method '{invocationDescriptor.Method}'"
};
var inputStream = new MemoryStream(incomingMessage.Payload.Buffer.ToArray());
_logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method);
}
// TODO: Handle receiving InvocationResultDescriptor
invocationDescriptor = await invocationAdapter.ReadMessageAsync(inputStream, this) as InvocationDescriptor;
}
// TODO: Pool memory
var outStream = new MemoryStream();
await invocationAdapter.WriteMessageAsync(result, outStream);
// Is there a better way of detecting that a connection was closed?
if (invocationDescriptor == null)
{
break;
}
var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve();
if (await connection.Transport.Output.WaitToWriteAsync())
{
connection.Transport.Output.TryWrite(new Message(buffer, connection.Metadata.Format, endOfMessage: true));
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Received hub invocation: {invocation}", invocationDescriptor);
}
InvocationResultDescriptor result;
Func<Connection, InvocationDescriptor, Task<InvocationResultDescriptor>> callback;
if (_callbacks.TryGetValue(invocationDescriptor.Method, out callback))
{
result = await callback(connection, invocationDescriptor);
}
else
{
// If there's no method then return a failed response for this request
result = new InvocationResultDescriptor
{
Id = invocationDescriptor.Id,
Error = $"Unknown hub method '{invocationDescriptor.Method}'"
};
_logger.LogError("Unknown hub method '{method}'", invocationDescriptor.Method);
}
// TODO: Pool memory
var outStream = new MemoryStream();
await invocationAdapter.WriteMessageAsync(result, outStream);
var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve();
var outMessage = new Message(buffer, connection.Metadata.Format, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{
if (connection.Transport.Output.TryWrite(outMessage))
{
break;
}
}
}
}
}

View File

@ -225,8 +225,14 @@ namespace Microsoft.AspNetCore.Sockets
format,
endOfMessage: true);
await state.Application.Output.WriteAsync(message);
// REVIEW: Do we want to return a specific status code here if the connection has ended?
while (await state.Application.Output.WaitToWriteAsync())
{
if (state.Application.Output.TryWrite(message))
{
break;
}
}
}
else
{

View File

@ -24,20 +24,17 @@ namespace Microsoft.AspNetCore.Sockets.Transports
public async Task ProcessRequestAsync(HttpContext context)
{
if (_application.Completion.IsCompleted)
{
// Client should stop if it receives a 204
_logger.LogInformation("Terminating Long Polling connection by sending 204 response.");
context.Response.StatusCode = 204;
return;
}
try
{
// TODO: We need the ability to yield the connection without completing the channel.
// This is to force ReadAsync to yield without data to end to poll but not the entire connection.
// This is for cases when the client reconnects see issue #27
await _application.WaitToReadAsync(context.RequestAborted);
if (!await _application.WaitToReadAsync(context.RequestAborted))
{
_logger.LogInformation("Terminating Long Polling connection by sending 204 response.");
context.Response.StatusCode = 204;
return;
}
Message message;
if (_application.TryRead(out message))

View File

@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
while (await _application.WaitToReadAsync(context.RequestAborted))
{
Message message;
if (_application.TryRead(out message))
while (_application.TryRead(out message))
{
using (message)
{

View File

@ -156,7 +156,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
// Get a frame from the application
Message message;
if (_application.Input.TryRead(out message))
while (_application.Input.TryRead(out message))
{
using (message)
{