This commit is contained in:
Justin Kotalik 2020-04-01 16:05:38 -07:00
commit b8743fc454
5 changed files with 141 additions and 24 deletions

View File

@ -235,7 +235,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
private async Task StartAsyncInner(CancellationToken cancellationToken = default)
{
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: cancellationToken);
try
{
if (!_state.TryChangeState(HubConnectionState.Disconnected, HubConnectionState.Connecting))
@ -465,7 +465,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// Potentially wait for StartAsync to finish, and block a new StartAsync from
// starting until we've finished stopping.
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
// Ensure that ReconnectingState.ReconnectTask is not accessed outside of the lock.
var reconnectTask = _state.ReconnectTask;
@ -478,7 +478,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// The StopCts should prevent the HubConnection from restarting until it is reset.
_state.ReleaseConnectionLock();
await reconnectTask;
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
}
ConnectionState connectionState;
@ -574,7 +574,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
async Task OnStreamCanceled(InvocationRequest irq)
{
// We need to take the connection lock in order to ensure we a) have a connection and b) are the only one accessing the write end of the pipe.
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
try
{
if (_state.CurrentConnectionStateUnsynchronized != null)
@ -601,7 +601,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
var readers = default(Dictionary<string, object>);
CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync));
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync), token: cancellationToken);
ChannelReader<object> channel;
try
@ -704,7 +704,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
while (!tokenSource.Token.IsCancellationRequested && reader.TryRead(out var item))
{
await SendWithLock(connectionState, new StreamItemMessage(streamId, item));
await SendWithLock(connectionState, new StreamItemMessage(streamId, item), tokenSource.Token);
Log.SendingStreamItem(_logger, streamId);
}
}
@ -722,7 +722,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
await foreach (var streamValue in streamValues)
{
await SendWithLock(connectionState, new StreamItemMessage(streamId, streamValue));
await SendWithLock(connectionState, new StreamItemMessage(streamId, streamValue), tokenSource.Token);
Log.SendingStreamItem(_logger, streamId);
}
}
@ -750,7 +750,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.CompletingStream(_logger, streamId);
await SendWithLock(connectionState, CompletionMessage.WithError(streamId, responseError), cts.Token);
// Don't use cancellation token here
// this is triggered by a cancellation token to tell the server that the client is done streaming
await SendWithLock(connectionState, CompletionMessage.WithError(streamId, responseError), cancellationToken: default);
}
private async Task<object> InvokeCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
@ -758,7 +760,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
var readers = default(Dictionary<string, object>);
CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync));
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync), token: cancellationToken);
Task<object> invocationTask;
try
@ -853,7 +855,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
var readers = default(Dictionary<string, object>);
CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync));
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync), token: cancellationToken);
try
{
CheckDisposed();
@ -872,10 +874,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private async Task SendWithLock(ConnectionState expectedConnectionState, HubMessage message, CancellationToken cancellationToken = default, [CallerMemberName] string callerName = "")
private async Task SendWithLock(ConnectionState expectedConnectionState, HubMessage message, CancellationToken cancellationToken, [CallerMemberName] string callerName = "")
{
CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(callerName);
var connectionState = await _state.WaitForActiveConnectionAsync(callerName, token: cancellationToken);
try
{
CheckDisposed();
@ -1245,7 +1247,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
private async Task HandleConnectionClose(ConnectionState connectionState)
{
// Clear the connectionState field
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
try
{
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, connectionState),
@ -1363,7 +1365,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
Log.ReconnectingStoppedDuringRetryDelay(_logger);
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
try
{
_state.ChangeState(HubConnectionState.Reconnecting, HubConnectionState.Disconnected);
@ -1378,7 +1380,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
return;
}
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
try
{
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
@ -1417,7 +1419,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
nextRetryDelay = GetNextRetryDelay(previousReconnectAttempts++, DateTime.UtcNow - reconnectStartTime, retryReason);
}
await _state.WaitConnectionLockAsync();
await _state.WaitConnectionLockAsync(token: default);
try
{
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
@ -1956,10 +1958,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
SafeAssert(CurrentConnectionStateUnsynchronized != null, "We don't have a connection!", memberName, fileName, lineNumber);
}
public Task WaitConnectionLockAsync([CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
public Task WaitConnectionLockAsync(CancellationToken token, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
{
Log.WaitingOnConnectionLock(_logger, memberName, filePath, lineNumber);
return _connectionLock.WaitAsync();
return _connectionLock.WaitAsync(token);
}
public bool TryAcquireConnectionLock()
@ -1968,9 +1970,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
// Don't call this method in a try/finally that releases the lock since we're also potentially releasing the connection lock here.
public async Task<ConnectionState> WaitForActiveConnectionAsync(string methodName, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
public async Task<ConnectionState> WaitForActiveConnectionAsync(string methodName, CancellationToken token, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
{
await WaitConnectionLockAsync(methodName);
await WaitConnectionLockAsync(token, methodName);
if (CurrentConnectionStateUnsynchronized == null || CurrentConnectionStateUnsynchronized.Stopping)
{

View File

@ -443,7 +443,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
})),
async (connection) =>
{
// We aggregate failures that happen when we start the transport. The operation cancelled exception will
// We aggregate failures that happen when we start the transport. The operation canceled exception will
// be an inner exception.
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await connection.StartAsync(cts.Token)).OrTimeout();
Assert.Equal(3, ex.InnerExceptions.Count);
@ -454,6 +454,29 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task CanceledCancellationTokenPassedToStartThrows()
{
using (StartVerifiableLog())
{
bool transportStartCalled = false;
var httpHandler = new TestHttpMessageHandler();
await WithConnectionAsync(
CreateConnection(httpHandler,
transport: new TestTransport(onTransportStart: () => {
transportStartCalled = true;
return Task.CompletedTask;
})),
async (connection) =>
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await connection.StartAsync(new CancellationToken(canceled: true))).OrTimeout();
});
Assert.False(transportStartCalled);
}
}
[Fact]
public async Task SSECanBeCanceled()
{

View File

@ -541,7 +541,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
try
{
await Assert.ThrowsAsync<OperationCanceledException>(() => hubConnection.StartAsync(new CancellationToken(canceled: true))).OrTimeout();
await Assert.ThrowsAsync<TaskCanceledException>(() => hubConnection.StartAsync(new CancellationToken(canceled: true))).OrTimeout();
Assert.False(onStartCalled);
}
finally

View File

@ -162,6 +162,98 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
[Fact]
public async Task PendingInvocationsAreCanceledWhenTokenTriggered()
{
using (StartVerifiableLog())
{
var hubConnection = CreateHubConnection(new TestConnection(), loggerFactory: LoggerFactory);
await hubConnection.StartAsync().OrTimeout();
var cts = new CancellationTokenSource();
var invokeTask = hubConnection.InvokeAsync<int>("testMethod", cancellationToken: cts.Token).OrTimeout();
cts.Cancel();
await Assert.ThrowsAsync<TaskCanceledException>(async () => await invokeTask);
}
}
[Fact]
public async Task InvokeAsyncCanceledWhenPassedCanceledToken()
{
using (StartVerifiableLog())
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
await hubConnection.StartAsync().OrTimeout();
await Assert.ThrowsAsync<TaskCanceledException>(() =>
hubConnection.InvokeAsync<int>("testMethod", cancellationToken: new CancellationToken(canceled: true)).OrTimeout());
await hubConnection.StopAsync().OrTimeout();
// Assert that InvokeAsync didn't send a message
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
}
}
[Fact]
public async Task SendAsyncCanceledWhenPassedCanceledToken()
{
using (StartVerifiableLog())
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
await hubConnection.StartAsync().OrTimeout();
await Assert.ThrowsAsync<TaskCanceledException>(() =>
hubConnection.SendAsync("testMethod", cancellationToken: new CancellationToken(canceled: true)).OrTimeout());
await hubConnection.StopAsync().OrTimeout();
// Assert that SendAsync didn't send a message
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
}
}
[Fact]
public async Task StreamAsChannelAsyncCanceledWhenPassedCanceledToken()
{
using (StartVerifiableLog())
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
await hubConnection.StartAsync().OrTimeout();
await Assert.ThrowsAsync<TaskCanceledException>(() =>
hubConnection.StreamAsChannelAsync<int>("testMethod", cancellationToken: new CancellationToken(canceled: true)).OrTimeout());
await hubConnection.StopAsync().OrTimeout();
// Assert that StreamAsChannelAsync didn't send a message
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
}
}
[Fact]
public async Task StreamAsyncCanceledWhenPassedCanceledToken()
{
using (StartVerifiableLog())
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
await hubConnection.StartAsync().OrTimeout();
var result = hubConnection.StreamAsync<int>("testMethod", cancellationToken: new CancellationToken(canceled: true));
await Assert.ThrowsAsync<TaskCanceledException>(() => result.GetAsyncEnumerator().MoveNextAsync().OrTimeout());
await hubConnection.StopAsync().OrTimeout();
// Assert that StreamAsync didn't send a message
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
}
}
[Fact]
public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages()
{
@ -362,7 +454,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact]
[LogLevel(LogLevel.Trace)]
public async Task UploadStreamCancelationSendsStreamComplete()
public async Task UploadStreamCancellationSendsStreamComplete()
{
using (StartVerifiableLog())
{

View File

@ -205,7 +205,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
return;
}
await _connectionLock.WaitAsync();
await _connectionLock.WaitAsync(cancellationToken);
try
{
CheckDisposed();