Respect CancellationToken in HubConnection.StartAsync() (#10140)

This commit is contained in:
Dániel Lőczi 2019-05-22 23:37:08 +02:00 committed by Stephen Halter
parent 2cc071a41d
commit dbe9ab7dd5
10 changed files with 43 additions and 38 deletions

View File

@ -253,7 +253,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
throw new InvalidOperationException($"The {nameof(HubConnection)} cannot be started while {nameof(StopAsync)} is running.");
}
await StartAsyncCore(cancellationToken);
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _state.StopCts.Token))
{
await StartAsyncCore(cancellationToken);
}
_state.ChangeState(HubConnectionState.Connecting, HubConnectionState.Connected);
}
@ -422,7 +425,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.Starting(_logger);
// Start the connection
var connection = await _connectionFactory.ConnectAsync(_protocol.TransferFormat);
var connection = await _connectionFactory.ConnectAsync(_protocol.TransferFormat, cancellationToken);
var startingConnectionState = new ConnectionState(connection, this);
// From here on, if an error occurs we need to shut down the connection because
@ -1023,7 +1026,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
try
{
using (var handshakeCts = new CancellationTokenSource(HandshakeTimeout))
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, handshakeCts.Token, _state.StopCts.Token))
// cancellationToken already contains _state.StopCts.Token, so we don't have to link it again
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, handshakeCts.Token))
{
while (true)
{
@ -1287,7 +1291,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
var reconnectStartTime = DateTime.UtcNow;
var retryReason = closeException;
var nextRetryDelay = GetNextRetryDelay(previousReconnectAttempts++, TimeSpan.Zero, retryReason);
// We still have the connection lock from the caller, HandleConnectionClose.
_state.AssertInConnectionLock();
@ -1347,8 +1351,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
"Someone other than Reconnect set the connection state!");
// HandshakeAsync already checks ReconnectingConnectionState.StopCts.Token.
await StartAsyncCore(CancellationToken.None);
await StartAsyncCore(_state.StopCts.Token);
Log.Reconnected(_logger, previousReconnectAttempts, DateTime.UtcNow - reconnectStartTime);

View File

@ -1,5 +1,6 @@
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
@ -29,13 +30,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Format = transferFormat;
}
public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if ((Format & transferFormat) == 0)
{
throw new InvalidOperationException($"The '{transferFormat}' transfer format is not supported by this transport.");
}
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);

View File

@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public partial interface ITransport : System.IO.Pipelines.IDuplexPipe
{
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat);
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
System.Threading.Tasks.Task StopAsync();
}
public partial interface ITransportFactory
@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
@ -76,7 +76,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
@ -86,7 +86,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}

View File

@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public partial interface ITransport : System.IO.Pipelines.IDuplexPipe
{
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat);
System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
System.Threading.Tasks.Task StopAsync();
}
public partial interface ITransportFactory
@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
@ -76,7 +76,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}
@ -86,7 +86,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
public System.IO.Pipelines.PipeReader Input { get { throw null; } }
public System.IO.Pipelines.PipeWriter Output { get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat) { throw null; }
public System.Threading.Tasks.Task StartAsync(System.Uri url, Microsoft.AspNetCore.Connections.TransferFormat transferFormat, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task StopAsync() { throw null; }
}

View File

@ -188,11 +188,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
{
using (_logger.BeginScope(_logScope))
{
await StartAsyncCore(transferFormat).ForceAsync();
await StartAsyncCore(transferFormat, cancellationToken).ForceAsync();
}
}
private async Task StartAsyncCore(TransferFormat transferFormat)
private async Task StartAsyncCore(TransferFormat transferFormat, CancellationToken cancellationToken)
{
CheckDisposed();
@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
Log.Starting(_logger);
await SelectAndStartTransport(transferFormat);
await SelectAndStartTransport(transferFormat, cancellationToken);
_started = true;
Log.Started(_logger);
@ -288,7 +288,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
}
}
private async Task SelectAndStartTransport(TransferFormat transferFormat)
private async Task SelectAndStartTransport(TransferFormat transferFormat, CancellationToken cancellationToken)
{
var uri = _httpConnectionOptions.Url;
// Set the initial access token provider back to the original one from options
@ -301,7 +301,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
if (_httpConnectionOptions.Transports == HttpTransportType.WebSockets)
{
Log.StartingTransport(_logger, _httpConnectionOptions.Transports, uri);
await StartTransport(uri, _httpConnectionOptions.Transports, transferFormat);
await StartTransport(uri, _httpConnectionOptions.Transports, transferFormat, cancellationToken);
}
else
{
@ -315,7 +315,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
do
{
negotiationResponse = await GetNegotiationResponseAsync(uri);
negotiationResponse = await GetNegotiationResponseAsync(uri, cancellationToken);
if (negotiationResponse.Url != null)
{
@ -379,12 +379,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
// The negotiation response gets cleared in the fallback scenario.
if (negotiationResponse == null)
{
negotiationResponse = await GetNegotiationResponseAsync(uri);
negotiationResponse = await GetNegotiationResponseAsync(uri, cancellationToken);
connectUrl = CreateConnectUrl(uri, negotiationResponse.ConnectionId);
}
Log.StartingTransport(_logger, transportType, connectUrl);
await StartTransport(connectUrl, transportType, transferFormat);
await StartTransport(connectUrl, transportType, transferFormat, cancellationToken);
break;
}
}
@ -414,7 +414,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
}
}
private async Task<NegotiationResponse> NegotiateAsync(Uri url, HttpClient httpClient, ILogger logger)
private async Task<NegotiationResponse> NegotiateAsync(Uri url, HttpClient httpClient, ILogger logger, CancellationToken cancellationToken)
{
try
{
@ -436,7 +436,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
// rather than buffer the entire response. This gives a small perf boost.
// Note that it is important to dispose of the response when doing this to
// avoid leaving the connection open.
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken))
{
response.EnsureSuccessStatusCode();
var responseBuffer = await response.Content.ReadAsByteArrayAsync();
@ -467,7 +467,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
return Utils.AppendQueryString(url, "id=" + connectionId);
}
private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat)
private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat, CancellationToken cancellationToken)
{
// Construct the transport
var transport = _transportFactory.CreateTransport(transportType);
@ -475,7 +475,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
// Start the transport, giving it one end of the pipe
try
{
await transport.StartAsync(connectUrl, transferFormat);
await transport.StartAsync(connectUrl, transferFormat, cancellationToken);
}
catch (Exception ex)
{
@ -604,9 +604,9 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
#endif
}
private async Task<NegotiationResponse> GetNegotiationResponseAsync(Uri uri)
private async Task<NegotiationResponse> GetNegotiationResponseAsync(Uri uri, CancellationToken cancellationToken)
{
var negotiationResponse = await NegotiateAsync(uri, _httpClient, _logger);
var negotiationResponse = await NegotiateAsync(uri, _httpClient, _logger, cancellationToken);
_connectionId = negotiationResponse.ConnectionId;
_logScope.ConnectionId = _connectionId;
return negotiationResponse;

View File

@ -3,6 +3,7 @@
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
@ -10,7 +11,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
public interface ITransport : IDuplexPipe
{
Task StartAsync(Uri url, TransferFormat transferFormat);
Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default);
Task StopAsync();
}
}

View File

@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
}
public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if (transferFormat != TransferFormat.Binary && transferFormat != TransferFormat.Text)
{
@ -52,7 +52,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
// Make initial long polling request
// Server uses first long polling request to finish initializing connection and it returns without data
var request = new HttpRequestMessage(HttpMethod.Get, url);
using (var response = await _httpClient.SendAsync(request))
using (var response = await _httpClient.SendAsync(request, cancellationToken))
{
response.EnsureSuccessStatusCode();
}

View File

@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
}
public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if (transferFormat != TransferFormat.Text)
{
@ -62,7 +62,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
try
{
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, CancellationToken.None);
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();
}
catch

View File

@ -89,7 +89,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
_accessTokenProvider = accessTokenProvider;
}
public async Task StartAsync(Uri url, TransferFormat transferFormat)
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
if (url == null)
{
@ -121,7 +121,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
try
{
await _webSocket.ConnectAsync(resolvedUrl, CancellationToken.None);
await _webSocket.ConnectAsync(resolvedUrl, cancellationToken);
}
catch
{

View File

@ -568,7 +568,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public Task StartAsync(Uri url, TransferFormat transferFormat)
public Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
var options = ClientPipeOptions.DefaultOptions;
var pair = DuplexPipe.CreateConnectionPair(options, options);