Actually throwing exceptions from SendAsync (#1084)

SendAsync was using InvokeCoreAsync code to send messages. In case of exception InvokeCoreAsync is blocking and returns a task to the user so they can await for the remote call to complete. Any exception thrown is caught and used to fail the task returned to the user. SendAsync does not return a special task to the user so re-using InvokeCore resulted in swallowing exceptions. While SendAsync is fire and forget it actually should throw if the message could not be send and it was not happening.

While adding tests it turned out we did not test cases where Invoke/SendAsync/StreamAsync were invoked before starting the connection and this resulted in a NullReferenceException. I also fixed that.
This commit is contained in:
Pawel Kadluczka 2017-11-03 13:15:11 -07:00 committed by GitHub
parent 9371e7b50a
commit 06475270ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 142 additions and 36 deletions

View File

@ -299,10 +299,10 @@ describe('hubConnection', function () {
var complexObject = {
String: 'Hello, World!',
IntArray: [0x01, 0x02, 0x03, 0xff],
ByteArray: protocol.name == "json"
ByteArray: protocol.name === "json"
? btoa([0xff, 0x03, 0x02, 0x01])
: new Uint8Array([0xff, 0x03, 0x02, 0x01]),
GUID: protocol.name == "json"
GUID: protocol.name === "json"
? "00010203-0405-0607-0706-050403020100"
: new Uint8Array([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00])
};
@ -311,7 +311,7 @@ describe('hubConnection', function () {
return hubConnection.invoke('EchoComplexObject', complexObject);
})
.then(function (value) {
if (protocol.name == "messagepack") {
if (protocol.name === "messagepack") {
// msgpack creates a Buffer for byte arrays and jasmine fails to compare a Buffer
// and a Uint8Array even though Buffer instances are also Uint8Array instances
value.ByteArray = new Uint8Array(value.ByteArray);

View File

@ -37,6 +37,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
private int _nextId = 0;
private volatile bool _startCalled;
public event Func<Exception, Task> Closed
{
@ -65,7 +66,17 @@ namespace Microsoft.AspNetCore.SignalR.Client
_connection.Closed += Shutdown;
}
public async Task StartAsync() => await StartAsyncCore().ForceAsync();
public async Task StartAsync()
{
try
{
await StartAsyncCore().ForceAsync();
}
finally
{
_startCalled = true;
}
}
private async Task StartAsyncCore()
{
@ -141,13 +152,18 @@ namespace Microsoft.AspNetCore.SignalR.Client
private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
if (!_startCalled)
{
throw new InvalidOperationException($"The '{nameof(StreamAsync)}' method cannot be called before the connection has been started.");
}
var invokeCts = new CancellationTokenSource();
var irq = InvocationRequest.Stream(invokeCts.Token, returnType, GetNextId(), _loggerFactory, this, out var channel);
// After InvokeCore we don't want the irq cancellation token to be triggered.
// The stream invocation will be canceled by the CancelInvocationMessage, connection closing, or channel finishing.
using (cancellationToken.Register(token => ((CancellationTokenSource)token).Cancel(), invokeCts))
{
await InvokeCore(methodName, irq, args, nonBlocking: false);
await InvokeCore(methodName, irq, args);
}
if (cancellationToken.CanBeCanceled)
@ -178,44 +194,28 @@ namespace Microsoft.AspNetCore.SignalR.Client
private async Task<object> InvokeAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
if (!_startCalled)
{
throw new InvalidOperationException($"The '{nameof(InvokeAsync)}' method cannot be called before the connection has been started.");
}
var irq = InvocationRequest.Invoke(cancellationToken, returnType, GetNextId(), _loggerFactory, this, out var task);
await InvokeCore(methodName, irq, args, nonBlocking: false);
await InvokeCore(methodName, irq, args);
return await task;
}
public async Task SendAsync(string methodName, object[] args, CancellationToken cancellationToken = default) =>
await SendAsyncCore(methodName, args, cancellationToken).ForceAsync();
private Task SendAsyncCore(string methodName, object[] args, CancellationToken cancellationToken)
{
var irq = InvocationRequest.Invoke(cancellationToken, typeof(void), GetNextId(), _loggerFactory, this, out _);
return InvokeCore(methodName, irq, args, nonBlocking: true);
}
private Task InvokeCore(string methodName, InvocationRequest irq, object[] args, bool nonBlocking)
private Task InvokeCore(string methodName, InvocationRequest irq, object[] args)
{
ThrowIfConnectionTerminated(irq.InvocationId);
if (nonBlocking)
{
_logger.PreparingNonBlockingInvocation(irq.InvocationId, methodName, args.Length);
}
else
{
_logger.PreparingBlockingInvocation(irq.InvocationId, methodName, irq.ResultType.FullName, args.Length);
}
_logger.PreparingBlockingInvocation(irq.InvocationId, methodName, irq.ResultType.FullName, args.Length);
// Create an invocation descriptor. Client invocations are always blocking
var invocationMessage = new InvocationMessage(irq.InvocationId, nonBlocking, methodName,
// Client invocations are always blocking
var invocationMessage = new InvocationMessage(irq.InvocationId, nonBlocking: false, target: methodName,
argumentBindingException: null, arguments: args);
// We don't need to track invocations for fire an forget calls
if (!nonBlocking)
{
// I just want an excuse to use 'irq' as a variable name...
_logger.RegisterInvocation(invocationMessage.InvocationId);
_logger.RegisterInvocation(invocationMessage.InvocationId);
AddInvocation(irq);
}
AddInvocation(irq);
// Trace the full invocation
_logger.IssueInvocation(invocationMessage.InvocationId, irq.ResultType.FullName, methodName, args);
@ -242,6 +242,38 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
public async Task SendAsync(string methodName, object[] args, CancellationToken cancellationToken = default) =>
await SendAsyncCore(methodName, args, cancellationToken).ForceAsync();
private async Task SendAsyncCore(string methodName, object[] args, CancellationToken cancellationToken)
{
if (!_startCalled)
{
throw new InvalidOperationException($"The '{nameof(SendAsync)}' method cannot be called before the connection has been started.");
}
var invocationMessage = new InvocationMessage(GetNextId(), nonBlocking: true, target: methodName,
argumentBindingException: null, arguments: args);
ThrowIfConnectionTerminated(invocationMessage.InvocationId);
try
{
_logger.PreparingNonBlockingInvocation(invocationMessage.InvocationId, methodName, args.Length);
var payload = _protocolReaderWriter.WriteMessage(invocationMessage);
_logger.SendInvocation(invocationMessage.InvocationId);
await _connection.SendAsync(payload, cancellationToken);
_logger.SendInvocationCompleted(invocationMessage.InvocationId);
}
catch (Exception ex)
{
_logger.SendInvocationFailed(invocationMessage.InvocationId, ex);
throw;
}
}
private async Task OnDataReceivedAsync(byte[] data)
{
if (_protocolReaderWriter.ReadMessages(data, _binder, out var messages))

View File

@ -2,8 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

View File

@ -54,6 +54,19 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Assert.Same(exception, actualException);
}
[Fact]
public async Task SendAsyncThrowsIfSerializingMessageFails()
{
var exception = new InvalidOperationException();
var mockProtocol = MockHubProtocol.Throw(exception);
var hubConnection = new HubConnection(new TestConnection(), mockProtocol, null);
await hubConnection.StartAsync();
var actualException =
await Assert.ThrowsAsync<InvalidOperationException>(async () => await hubConnection.SendAsync("test"));
Assert.Same(exception, actualException);
}
[Fact]
public async Task ClosedEventRaisedWhenTheClientIsStopped()
{
@ -71,6 +84,18 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Assert.Null(await closedEventTcs.Task.OrTimeout());
}
[Fact]
public async Task CannotCallInvokeOnNotStartedHubConnection()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => hubConnection.InvokeAsync<int>("test"));
Assert.Equal("The 'InvokeAsync' method cannot be called before the connection has been started.", exception.Message);
}
[Fact]
public async Task CannotCallInvokeOnClosedHubConnection()
{
@ -80,11 +105,62 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await hubConnection.StartAsync();
await hubConnection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.InvokeAsync<int>("test"));
() => hubConnection.InvokeAsync<int>("test"));
Assert.Equal("Connection has been terminated.", exception.Message);
}
[Fact]
public async Task CannotCallSendOnNotStartedHubConnection()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => hubConnection.SendAsync("test"));
Assert.Equal("The 'SendAsync' method cannot be called before the connection has been started.", exception.Message);
}
[Fact]
public async Task CannotCallSendOnClosedHubConnection()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
await hubConnection.StartAsync();
await hubConnection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => hubConnection.SendAsync("test"));
Assert.Equal("Connection has been terminated.", exception.Message);
}
[Fact]
public async Task CannotCallStreamOnClosedHubConnection()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
await hubConnection.StartAsync();
await hubConnection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => hubConnection.StreamAsync<int>("test"));
Assert.Equal("Connection has been terminated.", exception.Message);
}
[Fact]
public async Task CannotCallStreamOnNotStartedHubConnection()
{
var connection = new TestConnection();
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => hubConnection.StreamAsync<int>("test"));
Assert.Equal("The 'StreamAsync' method cannot be called before the connection has been started.", exception.Message);
}
[Fact]
public async Task PendingInvocationsAreCancelledWhenConnectionClosesCleanly()
{