Making HttpConnection restartable (C#) (#1147)

🎉
This commit is contained in:
Pawel Kadluczka 2017-12-12 10:08:42 -08:00 committed by Andrew Stanton-Nurse
parent 75e102f97a
commit 66ab939cff
26 changed files with 1145 additions and 281 deletions

View File

@ -9,6 +9,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-539
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{83B2C3EB-A3D8-4E6F-9A3C-A380B005EF31}"
ProjectSection(SolutionItems) = preProject
build\dependencies.props = build\dependencies.props
Directory.Build.props = Directory.Build.props
Directory.Build.targets = Directory.Build.targets
build\Key.snk = build\Key.snk
@ -85,7 +86,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtSample", "samples\JwtSam
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtClientSample", "samples\JwtClientSample\JwtClientSample.csproj", "{1A953296-E869-4DE2-A693-FD5FCDE27057}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Tests.Utils", "test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj", "{0A0A6135-EA24-4307-95C2-CE1B7E164A5E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Tests.Utils", "test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj", "{0A0A6135-EA24-4307-95C2-CE1B7E164A5E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution

View File

@ -28,6 +28,16 @@
"integrity": "sha512-zT+t9841g1HsjLtPMCYxmb1U4pcZ2TOegAKiomlmj6bIziuaEYHUavxLE9NRwdntY0vOCrgHho6OXjDX7fm/Kw==",
"dev": true
},
"JSONStream": {
"version": "1.3.1",
"resolved": "https://registry.npmjs.org/JSONStream/-/JSONStream-1.3.1.tgz",
"integrity": "sha1-cH92HgHa6eFvG8+TcDt4xwlmV5o=",
"dev": true,
"requires": {
"jsonparse": "1.3.1",
"through": "2.3.8"
}
},
"acorn": {
"version": "4.0.13",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-4.0.13.tgz",
@ -1017,9 +1027,9 @@
"integrity": "sha1-+GzWzvT1MAyOY+B6TVEvZfv/RTE=",
"dev": true,
"requires": {
"JSONStream": "1.3.1",
"combine-source-map": "0.7.2",
"defined": "1.0.0",
"JSONStream": "1.3.1",
"through2": "2.0.3",
"umd": "3.0.1"
}
@ -1047,6 +1057,7 @@
"integrity": "sha1-tanJAgJD8McORnW+yCI7xifkFc4=",
"dev": true,
"requires": {
"JSONStream": "1.3.1",
"assert": "1.4.1",
"browser-pack": "6.0.2",
"browser-resolve": "1.11.2",
@ -1068,7 +1079,6 @@
"https-browserify": "0.0.1",
"inherits": "2.0.3",
"insert-module-globals": "7.0.1",
"JSONStream": "1.3.1",
"labeled-stream-splicer": "2.0.0",
"module-deps": "4.1.1",
"os-browserify": "0.1.2",
@ -2487,10 +2497,10 @@
"integrity": "sha1-wDv04BywhtW15azorQr+eInWOMM=",
"dev": true,
"requires": {
"JSONStream": "1.3.1",
"combine-source-map": "0.7.2",
"concat-stream": "1.5.2",
"is-buffer": "1.1.5",
"JSONStream": "1.3.1",
"lexical-scope": "1.2.0",
"process": "0.11.10",
"through2": "2.0.3",
@ -2763,16 +2773,6 @@
"integrity": "sha1-P02uSpH6wxX3EGL4UhzCOfE2YoA=",
"dev": true
},
"JSONStream": {
"version": "1.3.1",
"resolved": "https://registry.npmjs.org/JSONStream/-/JSONStream-1.3.1.tgz",
"integrity": "sha1-cH92HgHa6eFvG8+TcDt4xwlmV5o=",
"dev": true,
"requires": {
"jsonparse": "1.3.1",
"through": "2.3.8"
}
},
"kind-of": {
"version": "3.2.2",
"resolved": "https://registry.npmjs.org/kind-of/-/kind-of-3.2.2.tgz",
@ -3126,6 +3126,7 @@
"integrity": "sha1-IyFYM/HaE/1gbMuAh7RIUty4If0=",
"dev": true,
"requires": {
"JSONStream": "1.3.1",
"browser-resolve": "1.11.2",
"cached-path-relative": "1.0.1",
"concat-stream": "1.5.2",
@ -3133,7 +3134,6 @@
"detective": "4.5.0",
"duplexer2": "0.1.4",
"inherits": "2.0.3",
"JSONStream": "1.3.1",
"parents": "1.0.1",
"readable-stream": "2.2.11",
"resolve": "1.3.3",

View File

@ -30,11 +30,22 @@ namespace ClientSample
baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl;
Console.WriteLine("Connecting to {0}", baseUrl);
HubConnection connection = await ConnectAsync(baseUrl);
Console.WriteLine("Connected to {0}", baseUrl);
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.WithConsoleLogger(LogLevel.Trace)
.Build();
try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.SetResult(null);
// Set up handler
connection.On<string>("Send", Console.WriteLine);
await ConnectAsync(connection);
Console.WriteLine("Connected to {0}", baseUrl);
var sendCts = new CancellationTokenSource();
Console.CancelKeyPress += async (sender, a) =>
@ -45,13 +56,10 @@ namespace ClientSample
await connection.DisposeAsync();
};
// Set up handler
connection.On<string>("Send", Console.WriteLine);
while (!connection.Closed.IsCompleted)
while (!closeTcs.Task.IsCompleted)
{
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), connection.Closed);
if (completedTask == connection.Closed)
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), closeTcs.Task);
if (completedTask == closeTcs.Task)
{
break;
}
@ -79,19 +87,15 @@ namespace ClientSample
return 0;
}
private static async Task<HubConnection> ConnectAsync(string baseUrl)
private static async Task ConnectAsync(HubConnection connection)
{
// Keep trying to until we can start
while (true)
{
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.WithConsoleLogger(LogLevel.Trace)
.Build();
try
{
await connection.StartAsync();
return connection;
}
catch (Exception)
{

View File

@ -39,8 +39,9 @@ namespace ClientSample
var connection = new HttpConnection(new Uri(baseUrl), loggerFactory);
try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.SetResult(null);
connection.OnReceived(data => Console.Out.WriteLineAsync($"{Encoding.UTF8.GetString(data)}"));
await connection.StartAsync();
Console.WriteLine($"Connected to {baseUrl}");
@ -51,7 +52,7 @@ namespace ClientSample
await connection.DisposeAsync();
};
while (!connection.Closed.IsCompleted)
while (!closeTcs.Task.IsCompleted)
{
var line = await Task.Run(() => Console.ReadLine(), cts.Token);

View File

@ -34,6 +34,9 @@ namespace JwtClientSample
.WithJwtBearer(() => _tokens[userId])
.Build();
var closedTcs = new TaskCompletionSource<object>();
hubConnection.Closed += e => closedTcs.SetResult(null);
hubConnection.On<string, string>("Message", (sender, message) => Console.WriteLine($"[{userId}] {sender}: {message}"));
await hubConnection.StartAsync();
Console.WriteLine($"[{userId}] Connection Started");
@ -43,7 +46,7 @@ namespace JwtClientSample
try
{
while (!hubConnection.Closed.IsCompleted)
while (!closedTcs.Task.IsCompleted)
{
await Task.Delay(1000);
ticks++;

View File

@ -34,16 +34,16 @@ namespace Microsoft.AspNetCore.SignalR.Client
private HubProtocolReaderWriter _protocolReaderWriter;
private readonly object _pendingCallsLock = new object();
private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource();
private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>();
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
private CancellationTokenSource _connectionActive;
private int _nextId = 0;
private volatile bool _startCalled;
private Timer _timeoutTimer;
private bool _needKeepAlive;
public Task Closed { get; }
public event Action<Exception> Closed;
/// <summary>
/// Gets or sets the server timeout interval for the connection. Changes to this value
@ -69,11 +69,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
_connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this);
Closed = _connection.Closed.ContinueWith(task =>
{
Shutdown(task.Exception);
return task;
}).Unwrap();
_connection.Closed += e => Shutdown(e);
// Create the timer for timeout, but disabled by default (we enable it when started).
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
@ -122,12 +118,14 @@ namespace Microsoft.AspNetCore.SignalR.Client
transferModeFeature.TransferMode = requestedTransferMode;
await _connection.StartAsync();
_needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null;
var actualTransferMode = transferModeFeature.TransferMode;
_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));
_logger.HubProtocol(_protocol.Name);
_connectionActive = new CancellationTokenSource();
using (var memoryStream = new MemoryStream())
{
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
@ -151,13 +149,16 @@ namespace Microsoft.AspNetCore.SignalR.Client
return new PassThroughEncoder();
}
public async Task StopAsync() => await StopAsyncCore().ForceAsync();
private Task StopAsyncCore() => _connection.StopAsync();
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
private async Task DisposeAsyncCore()
{
_timeoutTimer.Dispose();
await _connection.DisposeAsync();
await Closed;
}
// TODO: Client return values/tasks?
@ -370,12 +371,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private void Shutdown(Exception ex = null)
private void Shutdown(Exception exception = null)
{
_logger.ShutdownConnection();
if (ex != null)
if (exception != null)
{
_logger.ShutdownWithError(ex);
_logger.ShutdownWithError(exception);
}
lock (_pendingCallsLock)
@ -388,14 +389,23 @@ namespace Microsoft.AspNetCore.SignalR.Client
foreach (var outstandingCall in _pendingCalls.Values)
{
_logger.RemoveInvocation(outstandingCall.InvocationId);
if (ex != null)
if (exception != null)
{
outstandingCall.Fail(ex);
outstandingCall.Fail(exception);
}
outstandingCall.Dispose();
}
_pendingCalls.Clear();
}
try
{
Closed?.Invoke(exception);
}
catch (Exception ex)
{
_logger.ErrorDuringClosedEvent(ex);
}
}
private async Task DispatchInvocationAsync(InvocationMessage invocation, CancellationToken cancellationToken)

View File

@ -88,6 +88,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Internal
private static readonly Action<ILogger, Exception> _resettingKeepAliveTimer =
LoggerMessage.Define(LogLevel.Trace, new EventId(25, nameof(ResettingKeepAliveTimer)), "Resetting keep-alive timer, received a message from the server.");
private static readonly Action<ILogger, Exception> _errorDuringClosedEvent =
LoggerMessage.Define(LogLevel.Error, new EventId(26, nameof(ErrorDuringClosedEvent)), "An exception was thrown in the handler for the Closed event.");
// Category: Streaming and NonStreaming
private static readonly Action<ILogger, string, Exception> _invocationCreated =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(0, nameof(InvocationCreated)), "Invocation {invocationId} created.");
@ -292,5 +295,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Internal
{
_resettingKeepAliveTimer(logger, null);
}
public static void ErrorDuringClosedEvent(this ILogger logger, Exception exception)
{
_errorDuringClosedEvent(logger, exception);
}
}
}

View File

@ -12,12 +12,13 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
Task StartAsync();
Task SendAsync(byte[] data, CancellationToken cancellationToken);
Task StopAsync();
Task DisposeAsync();
Task AbortAsync(Exception ex);
IDisposable OnReceived(Func<byte[], object, Task> callback, object state);
Task Closed { get; }
event Action<Exception> Closed;
IFeatureCollection Features { get; }
}

View File

@ -28,17 +28,20 @@ namespace Microsoft.AspNetCore.Sockets.Client
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private volatile int _connectionState = ConnectionState.Initial;
private volatile ConnectionState _connectionState = ConnectionState.Disconnected;
private readonly object _stateChangeLock = new object();
private volatile ChannelConnection<byte[], SendMessage> _transportChannel;
private readonly HttpClient _httpClient;
private readonly HttpOptions _httpOptions;
private volatile ITransport _transport;
private volatile Task _receiveLoopTask;
private TaskCompletionSource<object> _startTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<object> _closedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private TaskQueue _eventQueue = new TaskQueue();
private TaskCompletionSource<object> _startTcs;
private TaskCompletionSource<object> _closeTcs;
private TaskQueue _eventQueue;
private readonly ITransportFactory _transportFactory;
private string _connectionId;
private Exception _abortException;
private readonly TimeSpan _eventQueueDrainTimeout = TimeSpan.FromSeconds(5);
private ChannelReader<byte[]> Input => _transportChannel.Input;
private ChannelWriter<SendMessage> Output => _transportChannel.Output;
@ -49,7 +52,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
public IFeatureCollection Features { get; } = new FeatureCollection();
public Task Closed => _closedTcs.Task;
public event Action<Exception> Closed;
public HttpConnection(Uri url)
: this(url, TransportType.All)
@ -103,25 +106,26 @@ namespace Microsoft.AspNetCore.Sockets.Client
private Task StartAsyncCore()
{
if (Interlocked.CompareExchange(ref _connectionState, ConnectionState.Connecting, ConnectionState.Initial)
!= ConnectionState.Initial)
if (ChangeState(from: ConnectionState.Disconnected, to: ConnectionState.Connecting) != ConnectionState.Disconnected)
{
return Task.FromException(
new InvalidOperationException("Cannot start a connection that is not in the Initial state."));
new InvalidOperationException($"Cannot start a connection that is not in the {nameof(ConnectionState.Disconnected)} state."));
}
_startTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_eventQueue = new TaskQueue();
StartAsyncInternal()
.ContinueWith(t =>
{
if (t.IsFaulted)
var abortException = _abortException;
if (t.IsFaulted || abortException != null)
{
_startTcs.SetException(t.Exception.InnerException);
_closedTcs.TrySetException(t.Exception.InnerException);
_startTcs.SetException(_abortException ?? t.Exception.InnerException);
}
else if (t.IsCanceled)
{
_startTcs.SetCanceled();
_closedTcs.SetCanceled();
}
else
{
@ -148,8 +152,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
var negotiationResponse = await Negotiate(Url, _httpClient, _logger);
_connectionId = negotiationResponse.ConnectionId;
// Connection is being stopped while start was in progress
if (_connectionState == ConnectionState.Disconnected)
// Connection is being disposed while start was in progress
if (_connectionState == ConnectionState.Disposed)
{
_logger.HttpConnectionClosed(_connectionId);
return;
@ -164,17 +168,25 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
catch
{
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
// The connection can now be either in the Connecting or Disposed state - only change the state to
// Disconnected if the connection was in the Connecting state to not resurrect a Disposed connection
ChangeState(from: ConnectionState.Connecting, to: ConnectionState.Disconnected);
throw;
}
// if the connection is not in the Connecting state here it means the user called DisposeAsync
if (Interlocked.CompareExchange(ref _connectionState, ConnectionState.Connected, ConnectionState.Connecting)
== ConnectionState.Connecting)
// if the connection is not in the Connecting state here it means the user called DisposeAsync while
// the connection was starting
if (ChangeState(from: ConnectionState.Connecting, to: ConnectionState.Connected) == ConnectionState.Connecting)
{
_closeTcs = new TaskCompletionSource<object>();
_ = Input.Completion.ContinueWith(async t =>
{
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
// Grab the exception and then clear it.
// See comment at AbortAsync for more discussion on the thread-safety
// StartAsync can't be called until the ChangeState below, so we're OK.
var abortException = _abortException;
_abortException = null;
// There is an inherent race between receive and close. Removing the last message from the channel
// makes Input.Completion task completed and runs this continuation. We need to await _receiveLoopTask
@ -187,28 +199,38 @@ namespace Microsoft.AspNetCore.Sockets.Client
await _receiveLoopTask;
_logger.DrainEvents(_connectionId);
await _eventQueue.Drain();
await Task.WhenAny(_eventQueue.Drain().NoThrow(), Task.Delay(_eventQueueDrainTimeout));
_httpClient?.Dispose();
_logger.CompleteClosed(_connectionId);
if (t.IsFaulted)
// At this point the connection can be either in the Connected or Disposed state. The state should be changed
// to the Disconnected state only if it was in the Connected state.
// From this point on, StartAsync can be called at any time.
ChangeState(from: ConnectionState.Connected, to: ConnectionState.Disconnected);
_closeTcs.SetResult(null);
try
{
_closedTcs.TrySetException(t.Exception.InnerException);
if (t.IsFaulted)
{
Closed?.Invoke(t.Exception.InnerException);
}
else
{
// Call the closed event. If there was an abort exception, it will be flowed forward
// However, if there wasn't, this will just be null and we're good
Closed?.Invoke(abortException);
}
}
if (t.IsCanceled)
catch (Exception ex)
{
_closedTcs.TrySetCanceled();
}
else
{
_closedTcs.TrySetResult(null);
// Suppress (but log) the exception, this is user code
_logger.ErrorDuringClosedEvent(ex);
}
});
// start receive loop only after the Connected event was raised to
// avoid Received event being raised before the Connected event
_receiveLoopTask = ReceiveAsync();
}
}
@ -306,7 +328,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
await _transport.StartAsync(connectUrl, applicationSide, GetTransferMode(), _connectionId, this);
// actual transfer mode can differ from the one that was requested so set it on the feature
Debug.Assert(_transport.Mode.HasValue, "transfer mode not set after transport started");
if (!_transport.Mode.HasValue)
{
// This can happen with custom transports so it should be an exception, not an assert.
throw new InvalidOperationException("Transport was expected to set the Mode property after StartAsync, but it has not been set.");
}
SetTransferMode(_transport.Mode.Value);
}
catch (Exception ex)
@ -435,31 +461,42 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
}
public async Task AbortAsync(Exception ex) => await DisposeAsyncCore(ex ?? new InvalidOperationException("Connection aborted")).ForceAsync();
// AbortAsync creates a few thread-safety races that we are OK with.
// 1. If the transport shuts down gracefully after AbortAsync is called but BEFORE _abortException is called, then the
// Closed event will not receive the Abort exception. This is OK because technically the transport was shut down gracefully
// before it was aborted
// 2. If the transport is closed gracefully and then AbortAsync is called before it captures the _abortException value
// the graceful shutdown could be turned into an abort. However, again, this is an inherent race between two different conditions:
// The transport shutting down because the server went away, and the user requesting the Abort
// 3. Finally, because this is an instance field, there is a possible race around accidentally re-using _abortException in the restarted
// connection. The scenario here is: AbortAsync(someException); StartAsync(); CloseAsync(); Where the _abortException value from the
// first AbortAsync call is still set at the time CloseAsync gets to calling the Closed event. However, this can't happen because the
// StartAsync method can't be called until the connection state is changed to Disconnected, which happens AFTER the close code
// captures and resets _abortException.
public async Task AbortAsync(Exception exception) => await StopAsyncCore(exception ?? throw new ArgumentNullException(nameof(exception))).ForceAsync();
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
public async Task StopAsync() => await StopAsyncCore(exception: null).ForceAsync();
private async Task DisposeAsyncCore(Exception ex = null)
private async Task StopAsyncCore(Exception exception)
{
if (ex != null)
lock (_stateChangeLock)
{
_logger.AbortingClient(_connectionId, ex);
// Immediately fault the close task. When the transport shuts down,
// it will trigger the close task to be completed, so we want it to be
// marked faulted before that happens
_closedTcs.TrySetException(ex);
}
else
{
_logger.StoppingClient(_connectionId);
if (!(_connectionState == ConnectionState.Connecting || _connectionState == ConnectionState.Connected))
{
_logger.SkippingStop(_connectionId);
return;
}
}
if (Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected) == ConnectionState.Initial)
{
// the connection was never started so there is nothing to clean up
return;
}
// Note that this method can be called at the same time when the connection is being closed from the server
// side due to an error. We are resilient to this since we merely try to close the channel here and the
// channel can be closed only once. As a result the continuation that does actual job and raises the Closed
// event runs always only once.
// See comment at AbortAsync for more discussion on the thread-safety of this.
_abortException = exception;
_logger.StoppingClient(_connectionId);
try
{
@ -486,8 +523,29 @@ namespace Microsoft.AspNetCore.Sockets.Client
await _receiveLoopTask;
}
// If we haven't already done so, trigger the Closed task.
_closedTcs.TrySetResult(null);
if (_closeTcs != null)
{
await _closeTcs.Task;
}
}
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
private async Task DisposeAsyncCore()
{
// This will no-op if we're already stopped
await StopAsyncCore(exception: null);
if (ChangeState(to: ConnectionState.Disposed) == ConnectionState.Disposed)
{
_logger.SkippingDispose(_connectionId);
// the connection was already disposed
return;
}
_logger.DisposingClient(_connectionId);
_httpClient?.Dispose();
}
@ -537,12 +595,39 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
}
private class ConnectionState
private ConnectionState ChangeState(ConnectionState from, ConnectionState to)
{
public const int Initial = 0;
public const int Connecting = 1;
public const int Connected = 2;
public const int Disconnected = 3;
lock (_stateChangeLock)
{
var state = _connectionState;
if (_connectionState == from)
{
_connectionState = to;
}
_logger.ConnectionStateChanged(_connectionId, state, to);
return state;
}
}
private ConnectionState ChangeState(ConnectionState to)
{
lock (_stateChangeLock)
{
var state = _connectionState;
_connectionState = to;
_logger.ConnectionStateChanged(_connectionId, state, to);
return state;
}
}
// Internal because it's used by logging to avoid ToStringing prematurely.
internal enum ConnectionState
{
Disconnected,
Connecting,
Connected,
Disposed
}
private class NegotiationResponse

View File

@ -151,11 +151,25 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
LoggerMessage.Define<DateTime, string>(LogLevel.Information, new EventId(18, nameof(StoppingClient)), "{time}: Connection Id {connectionId}: Stopping client.");
private static readonly Action<ILogger, DateTime, string, string, Exception> _exceptionThrownFromCallback =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Error, new EventId(19, nameof(ExceptionThrownFromCallback)), "{time}: Connection Id {connectionId}: An exception was thrown from the '{callback}' callback");
LoggerMessage.Define<DateTime, string, string>(LogLevel.Error, new EventId(19, nameof(ExceptionThrownFromCallback)), "{time}: Connection Id {connectionId}: An exception was thrown from the '{callback}' callback.");
private static readonly Action<ILogger, DateTime, string, Exception> _disposingClient =
LoggerMessage.Define<DateTime, string>(LogLevel.Information, new EventId(20, nameof(DisposingClient)), "{time}: Connection Id {connectionId}: Disposing client.");
private static readonly Action<ILogger, DateTime, string, Exception> _abortingClient =
LoggerMessage.Define<DateTime, string>(LogLevel.Error, new EventId(20, nameof(AbortingClient)), "{time}: Connection Id {connectionId}: Aborting client.");
LoggerMessage.Define<DateTime, string>(LogLevel.Error, new EventId(21, nameof(AbortingClient)), "{time}: Connection Id {connectionId}: Aborting client.");
private static readonly Action<ILogger, Exception> _errorDuringClosedEvent =
LoggerMessage.Define(LogLevel.Error, new EventId(22, nameof(ErrorDuringClosedEvent)), "An exception was thrown in the handler for the Closed event.");
private static readonly Action<ILogger, DateTime, string, Exception> _skippingStop =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, new EventId(23, nameof(SkippingStop)), "{time}: Connection Id {connectionId}: Skipping stop, connection is already stopped.");
private static readonly Action<ILogger, DateTime, string, Exception> _skippingDispose =
LoggerMessage.Define<DateTime, string>(LogLevel.Debug, new EventId(24, nameof(SkippingDispose)), "{time}: Connection Id {connectionId}: Skipping dispose, connection is already disposed.");
private static readonly Action<ILogger, DateTime, string, string, string, Exception> _connectionStateChanged =
LoggerMessage.Define<DateTime, string, string, string>(LogLevel.Debug, new EventId(25, nameof(ConnectionStateChanged)), "{time}: Connection Id {connectionId}: Connection state changed from {previousState} to {newState}.");
public static void StartTransport(this ILogger logger, string connectionId, TransferMode transferMode)
{
@ -525,6 +539,38 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
}
}
public static void DisposingClient(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Information))
{
_disposingClient(logger, DateTime.Now, connectionId, null);
}
}
public static void SkippingDispose(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_skippingDispose(logger, DateTime.Now, connectionId, null);
}
}
public static void ConnectionStateChanged(this ILogger logger, string connectionId, HttpConnection.ConnectionState previousState, HttpConnection.ConnectionState newState)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_connectionStateChanged(logger, DateTime.Now, connectionId, previousState.ToString(), newState.ToString(), null);
}
}
public static void SkippingStop(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Debug))
{
_skippingStop(logger, DateTime.Now, connectionId, null);
}
}
public static void ExceptionThrownFromCallback(this ILogger logger, string connectionId, string callbackName, Exception exception)
{
if (logger.IsEnabled(LogLevel.Error))
@ -532,5 +578,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Internal
_exceptionThrownFromCallback(logger, DateTime.Now, connectionId, callbackName, exception);
}
}
public static void ErrorDuringClosedEvent(this ILogger logger, Exception exception)
{
_errorDuringClosedEvent(logger, exception);
}
}
}

View File

@ -103,6 +103,88 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
}
}
[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
public async Task CanStopAndStartConnection(IHubProtocol protocol, TransportType transportType, string path)
{
using (StartLog(out var loggerFactory))
{
const string originalMessage = "SignalR";
var httpConnection = new HttpConnection(new Uri(_serverFixture.Url + path), transportType, loggerFactory);
var connection = new HubConnection(httpConnection, protocol, loggerFactory);
try
{
await connection.StartAsync().OrTimeout();
var result = await connection.InvokeAsync<string>("Echo", originalMessage).OrTimeout();
Assert.Equal(originalMessage, result);
await connection.StopAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
result = await connection.InvokeAsync<string>("Echo", originalMessage).OrTimeout();
Assert.Equal(originalMessage, result);
}
catch (Exception ex)
{
loggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "Exception from test");
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}
[Fact]
public async Task CanStartConnectionFromClosedEvent()
{
using (StartLog(out var loggerFactory))
{
var logger = loggerFactory.CreateLogger<HubConnectionTests>();
const string originalMessage = "SignalR";
var httpConnection = new HttpConnection(new Uri(_serverFixture.Url + "/default"), loggerFactory);
var connection = new HubConnection(httpConnection, new JsonHubProtocol(), loggerFactory);
var restartTcs = new TaskCompletionSource<object>();
connection.Closed += async e =>
{
logger.LogInformation("Closed event triggered");
if (!restartTcs.Task.IsCompleted)
{
logger.LogInformation("Restarting connection");
await connection.StartAsync().OrTimeout();
logger.LogInformation("Restarted connection");
restartTcs.SetResult(null);
}
};
try
{
await connection.StartAsync().OrTimeout();
var result = await connection.InvokeAsync<string>("Echo", originalMessage).OrTimeout();
Assert.Equal(originalMessage, result);
logger.LogInformation("Stopping connection");
await connection.StopAsync().OrTimeout();
logger.LogInformation("Waiting for reconnect");
await restartTcs.Task.OrTimeout();
logger.LogInformation("Reconnection complete");
result = await connection.InvokeAsync<string>("Echo", originalMessage).OrTimeout();
Assert.Equal(originalMessage, result);
}
catch (Exception ex)
{
loggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "Exception from test");
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}
[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
public async Task MethodsAreCaseInsensitive(IHubProtocol protocol, TransportType transportType, string path)
@ -174,12 +256,30 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
var httpConnection = new HttpConnection(new Uri(_serverFixture.Url + path), transportType, loggerFactory);
var connection = new HubConnection(httpConnection, protocol, loggerFactory);
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e =>
{
if (e != null)
{
closeTcs.SetException(e);
}
else
{
closeTcs.SetResult(null);
}
};
try
{
await connection.StartAsync().OrTimeout();
await connection.InvokeAsync("CallHandlerThatDoesntExist").OrTimeout();
await connection.DisposeAsync().OrTimeout();
await connection.Closed.OrTimeout();
await closeTcs.Task.OrTimeout();
}
catch (Exception ex)
{
loggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} during test: {Message}", ex.GetType().Name, ex.Message);
throw;
}
finally
{

View File

@ -10,6 +10,12 @@
<PackageReference Include="Microsoft.CSharp" Version="$(MicrosoftCSharpPackageVersion)" />
</ItemGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />

View File

@ -0,0 +1,275 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Client.Tests;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public partial class HttpConnectionTests
{
// Nested class for grouping
public class AbortAsync
{
[Fact]
public async Task AbortAsyncTriggersClosedEventWithException()
{
var connection = CreateConnection(out var closedTask);
try
{
// Start the connection
await connection.StartAsync().OrTimeout();
// Abort with an error
var expected = new Exception("Ruh roh!");
await connection.AbortAsync(expected).OrTimeout();
// Verify that it is thrown
var actual = await Assert.ThrowsAsync<Exception>(async () => await closedTask.OrTimeout());
Assert.Same(expected, actual);
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task AbortAsyncWhileStoppingTriggersClosedEventWithException()
{
var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(2, out var syncPoints));
try
{
// Start the connection
await connection.StartAsync().OrTimeout();
// Stop normally
var stopTask = connection.StopAsync().OrTimeout();
// Wait to reach the first sync point
await syncPoints[0].WaitForSyncPoint().OrTimeout();
// Abort with an error
var expected = new Exception("Ruh roh!");
var abortTask = connection.AbortAsync(expected).OrTimeout();
// Wait for the sync point to hit again
await syncPoints[1].WaitForSyncPoint().OrTimeout();
// Release sync point 0
syncPoints[0].Continue();
// We should close with the error from Abort (because it was set by the call to Abort even though Stop triggered the close)
var actual = await Assert.ThrowsAsync<Exception>(async () => await closedTask.OrTimeout());
Assert.Same(expected, actual);
// Clean-up
syncPoints[1].Continue();
await Task.WhenAll(stopTask, abortTask).OrTimeout();
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task StopAsyncWhileAbortingTriggersClosedEventWithoutException()
{
var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(2, out var syncPoints));
try
{
// Start the connection
await connection.StartAsync().OrTimeout();
// Abort with an error
var expected = new Exception("Ruh roh!");
var abortTask = connection.AbortAsync(expected).OrTimeout();
// Wait to reach the first sync point
await syncPoints[0].WaitForSyncPoint().OrTimeout();
// Stop normally, without a sync point.
// This should clear the exception, meaning Closed will not "throw"
syncPoints[1].Continue();
await connection.StopAsync();
await closedTask.OrTimeout();
// Clean-up
syncPoints[0].Continue();
await abortTask.OrTimeout();
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress()
{
var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(out var syncPoint));
try
{
// Start the connection
await connection.StartAsync().OrTimeout();
// Abort with an error
var expected = new Exception("Ruh roh!");
var abortTask = connection.AbortAsync(expected).OrTimeout();
// Wait to reach the first sync point
await syncPoint.WaitForSyncPoint().OrTimeout();
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.StartAsync().OrTimeout());
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", ex.Message);
// Release the sync point and wait for close to complete
// (it will throw the abort exception)
syncPoint.Continue();
await abortTask.OrTimeout();
var actual = await Assert.ThrowsAsync<Exception>(() => closedTask.OrTimeout());
Assert.Same(expected, actual);
// We can start now
await connection.StartAsync().OrTimeout();
// And we can stop without getting the abort exception.
await connection.StopAsync().OrTimeout();
}
finally
{
// Dispose should be clean and exception free.
await connection.DisposeAsync().OrTimeout();
}
}
private HttpConnection CreateConnection(out Task closedTask, Func<Task> stopHandler = null)
{
var httpHandler = new TestHttpMessageHandler();
var transportFactory = new TestTransportFactory(new TestTransport(stopHandler));
var connection = new HttpConnection(
new Uri("http://fakeuri.org/"),
transportFactory,
NullLoggerFactory.Instance,
new HttpOptions()
{
HttpMessageHandler = httpHandler,
});
var closedTcs = new TaskCompletionSource<object>();
connection.Closed += ex =>
{
if (ex != null)
{
closedTcs.SetException(ex);
}
else
{
closedTcs.SetResult(null);
}
};
closedTask = closedTcs.Task;
return connection;
}
private class TestTransport : ITransport
{
private Channel<byte[], SendMessage> _application;
private readonly Func<Task> _stopHandler;
public TransferMode? Mode => TransferMode.Text;
public TestTransport(Func<Task> stopHandler)
{
_stopHandler = stopHandler ?? new Func<Task>(() => Task.CompletedTask);
}
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
{
_application = application;
return Task.CompletedTask;
}
public async Task StopAsync()
{
await _stopHandler();
_application.Writer.TryComplete();
}
}
// Possibly useful as a general-purpose async testing helper?
private class SyncPoint
{
private TaskCompletionSource<object> _atSyncPoint = new TaskCompletionSource<object>();
private TaskCompletionSource<object> _continueFromSyncPoint = new TaskCompletionSource<object>();
// Used by the test code to wait and continue
public Task WaitForSyncPoint() => _atSyncPoint.Task;
public void Continue() => _continueFromSyncPoint.TrySetResult(null);
// Used by the code under test to wait for the test code to release it.
public Task WaitToContinue()
{
_atSyncPoint.TrySetResult(null);
return _continueFromSyncPoint.Task;
}
public static Func<Task> Create(out SyncPoint syncPoint)
{
var handler = Create(1, out var syncPoints);
syncPoint = syncPoints[0];
return handler;
}
/// <summary>
/// Creates a re-entrant function that waits for sync points in sequence.
/// </summary>
/// <param name="count">The number of sync points to expect</param>
/// <param name="syncPoints">The <see cref="SyncPoint"/> objects that can be used to coordinate the sync point</param>
/// <returns></returns>
public static Func<Task> Create(int count, out SyncPoint[] syncPoints)
{
// Need to use a local so the closure can capture it. You can't use out vars in a closure.
var localSyncPoints = new SyncPoint[count];
for (var i = 0; i < count; i += 1)
{
localSyncPoints[i] = new SyncPoint();
}
syncPoints = localSyncPoints;
var counter = 0;
return () =>
{
if (counter >= localSyncPoints.Length)
{
return Task.CompletedTask;
}
else
{
var syncPoint = localSyncPoints[counter];
counter += 1;
return syncPoint.WaitToContinue();
}
};
}
}
}
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using System.Text;
@ -12,14 +13,20 @@ using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Features;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Moq;
using Moq.Protected;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
public class HttpConnectionTests
public partial class HttpConnectionTests : LoggedTest
{
public HttpConnectionTests(ITestOutputHelper output) : base(output)
{
}
[Fact]
public void CannotCreateConnectionWithNullUrl()
{
@ -53,7 +60,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -66,7 +73,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
}
finally
{
@ -75,7 +82,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
[Fact]
public async Task CannotStartStoppedConnection()
public async Task CannotStartConnectionDisposedAfterStarting()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
@ -83,7 +90,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -97,7 +104,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
}
[Fact]
@ -111,12 +118,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
}
}
[Fact]
public async Task CanStopStartingConnection()
public async Task CanDisposeStartingConnection()
{
// Used to make sure StartAsync is not completed before DisposeAsync is called
var releaseNegotiateTcs = new TaskCompletionSource<object>();
@ -134,13 +141,25 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
// allow DisposeAsync to continue once we know we are past the connection state check
allowDisposeTcs.SetResult(null);
await releaseNegotiateTcs.Task;
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var transport = new Mock<ITransport>();
transport.Setup(t => t.StopAsync()).Returns(async () => { await releaseDisposeTcs.Task; });
Channel<byte[], SendMessage> channel = null;
transport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
transport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((_, c, __, ___, ____) =>
{
channel = c;
return Task.CompletedTask;
});
transport.Setup(t => t.StopAsync()).Returns(async () =>
{
await releaseDisposeTcs.Task;
channel.Writer.TryComplete();
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(transport.Object), loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
@ -154,8 +173,214 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await startTask.OrTimeout();
releaseDisposeTcs.SetResult(null);
await disposeTask.OrTimeout();
}
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()), Times.Never);
[Fact]
public async Task CanStartConnectionThatFailedToStart()
{
var failNegotiate = true;
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (ResponseUtils.IsNegotiateRequest(request))
{
return failNegotiate
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse());
}
return ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
try
{
await connection.StartAsync().OrTimeout();
}
catch { }
failNegotiate = false;
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
[Fact]
public async Task CanStartStoppedConnection()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
await connection.StartAsync().OrTimeout();
await connection.StopAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
[Fact]
public async Task CanStopStartingConnection()
{
var allowStopTcs = new TaskCompletionSource<object>();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (ResponseUtils.IsNegotiateRequest(request))
{
allowStopTcs.SetResult(null);
return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse());
}
else
{
var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null;
return (content?.Length == 1 && content[0] == 0x42)
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
}
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.TrySetResult(null);
var startTask = connection.StartAsync();
await allowStopTcs.Task.OrTimeout();
await Task.WhenAll(startTask, connection.StopAsync()).OrTimeout();
await closeTcs.Task.OrTimeout();
}
[Fact]
public async Task CanStartConnectionAfterConnectionStoppedWithError()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (ResponseUtils.IsNegotiateRequest(request))
{
return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse());
}
var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null;
return (content?.Length == 1 && content[0] == 0x42)
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.TrySetResult(null);
await connection.StartAsync().OrTimeout();
try
{
await connection.SendAsync(new byte[] { 0x42 }).OrTimeout();
}
catch { }
await closeTcs.Task.OrTimeout();
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
[Fact]
public async Task CanDisposeStoppedConnection()
{
var connection = new HttpConnection(new Uri("http://fakeuri.org/"));
await connection.StopAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
[Fact]
public async Task StoppingStoppingConnectionNoOps()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (ResponseUtils.IsNegotiateRequest(request))
{
return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse());
}
else
{
var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null;
return (content?.Length == 1 && content[0] == 0x42)
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
}
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.TrySetResult(null);
await connection.StartAsync().OrTimeout();
await Task.WhenAll(connection.StopAsync().OrTimeout(), connection.StopAsync().OrTimeout());
await closeTcs.Task.OrTimeout();
}
[Fact]
public async Task DisposedStoppingConnectionDisposesConnection()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (ResponseUtils.IsNegotiateRequest(request))
{
return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse());
}
else
{
var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null;
return (content?.Length == 1 && content[0] == 0x42)
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
}
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.TrySetResult(null);
await connection.StartAsync().OrTimeout();
await Task.WhenAll(connection.StopAsync().OrTimeout(), connection.DisposeAsync().OrTimeout());
await closeTcs.Task.OrTimeout();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.StartAsync());
Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message);
}
[Fact]
@ -176,7 +401,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -201,7 +426,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -209,11 +434,21 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e =>
{
if (e != null)
{
closeTcs.SetException(e);
}
else
{
closeTcs.SetResult(null);
}
};
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
await connection.Closed.OrTimeout();
// in case of clean disconnect error should be null
await closeTcs.Task.OrTimeout();
}
[Fact]
@ -228,18 +463,30 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
return request.Method == HttpMethod.Get
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: IsNegotiateRequest(request)
: ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e =>
{
if (e != null)
{
closeTcs.SetException(e);
}
else
{
closeTcs.SetResult(null);
}
};
try
{
await connection.StartAsync().OrTimeout();
await Assert.ThrowsAsync<HttpRequestException>(() => connection.Closed.OrTimeout());
await Assert.ThrowsAsync<HttpRequestException>(() => closeTcs.Task.OrTimeout());
}
finally
{
@ -247,54 +494,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
}
[Fact]
public async Task ReceivedCallbackNotRaisedAfterConnectionIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
// The connection is now in the Disconnected state so the Received event for
// this message should not be raised
channel.Writer.TryWrite(Array.Empty<byte>());
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
var onReceivedInvoked = false;
connection.OnReceived(_ =>
{
onReceivedInvoked = true;
return Task.CompletedTask;
});
await connection.StartAsync();
await connection.DisposeAsync();
Assert.False(onReceivedInvoked);
}
[Fact]
public async Task EventsAreNotRunningOnMainLoop()
{
@ -304,7 +503,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -355,46 +554,63 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
[Fact]
public async Task EventQueueTimeout()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
using (StartLog(out var loggerFactory))
{
var logger = loggerFactory.CreateLogger<HttpConnectionTests>();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
logger.LogInformation("Transport started");
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
logger.LogInformation("Transport stopped");
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
var blockReceiveCallbackTcs = new TaskCompletionSource<object>();
var onReceivedCalledTcs = new TaskCompletionSource<object>();
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
connection.OnReceived(async _ =>
{
await Task.Yield();
return IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
onReceivedCalledTcs.TrySetResult(null);
await blockReceiveCallbackTcs.Task;
});
var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
logger.LogInformation("Starting connection");
await connection.StartAsync().OrTimeout();
logger.LogInformation("Started connection");
channel.Writer.TryWrite(Array.Empty<byte>());
await onReceivedCalledTcs.Task.OrTimeout();
var blockReceiveCallbackTcs = new TaskCompletionSource<object>();
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.Reader.TryRead(out var message));
var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null,
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
connection.OnReceived(_ => blockReceiveCallbackTcs.Task);
await connection.StartAsync();
channel.Writer.TryWrite(Array.Empty<byte>());
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.Reader.TryRead(out var message));
await connection.DisposeAsync();
logger.LogInformation("Disposing connection");
await connection.DisposeAsync().OrTimeout(TimeSpan.FromSeconds(10));
logger.LogInformation("Disposed connection");
}
}
[Fact]
@ -406,7 +622,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -449,9 +665,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted()
{
var connection = new HttpConnection(new Uri("http://fakeuri.org/"));
var closeInvoked = false;
connection.Closed += e => closeInvoked = true;
await connection.DisposeAsync();
Assert.False(connection.Closed.IsCompleted);
Assert.False(closeInvoked);
}
[Fact]
@ -464,7 +681,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -502,7 +719,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
if (IsNegotiateRequest(request))
if (ResponseUtils.IsNegotiateRequest(request))
{
return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse());
}
@ -557,7 +774,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
content = "T2:T:42;";
}
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK, content);
});
@ -584,7 +801,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: request.Method == HttpMethod.Post
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
@ -618,7 +835,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
content = "42";
}
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK, content);
});
@ -635,18 +852,17 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
return Task.CompletedTask;
}, receiveTcs);
_ = connection.Closed.ContinueWith(task =>
connection.Closed += e =>
{
if (task.Exception != null)
if (e != null)
{
receiveTcs.TrySetException(task.Exception);
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetCanceled();
}
return Task.CompletedTask;
});
};
await connection.StartAsync().OrTimeout();
Assert.Equal("42", await receiveTcs.Task.OrTimeout());
@ -674,7 +890,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
content = "42";
}
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK, content);
});
@ -698,18 +914,17 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
return Task.CompletedTask;
});
_ = connection.Closed.ContinueWith(task =>
connection.Closed += e =>
{
if (task.Exception != null)
if (e != null)
{
receiveTcs.TrySetException(task.Exception);
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetCanceled();
}
return Task.CompletedTask;
});
};
await connection.StartAsync();
@ -738,7 +953,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
content = "42";
}
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK, content);
});
@ -762,18 +977,17 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
return Task.CompletedTask;
});
_ = connection.Closed.ContinueWith(task =>
connection.Closed += e =>
{
if (task.Exception != null)
if (e != null)
{
receiveTcs.TrySetException(task.Exception);
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetCanceled();
}
return Task.CompletedTask;
});
};
await connection.StartAsync();
@ -797,7 +1011,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
return request.Method == HttpMethod.Get
? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)
: IsNegotiateRequest(request)
: ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -806,12 +1020,21 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object });
try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e =>
{
if (e != null)
{
closeTcs.SetException(e);
}
else
{
closeTcs.SetResult(null);
}
};
await connection.StartAsync().OrTimeout();
// Exception in send should shutdown the connection
await Assert.ThrowsAsync<HttpRequestException>(() => connection.Closed.OrTimeout());
await Assert.ThrowsAsync<HttpRequestException>(() => closeTcs.Task.OrTimeout());
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync(new byte[0]));
Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message);
@ -918,7 +1141,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return IsNegotiateRequest(request)
return ResponseUtils.IsNegotiateRequest(request)
? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse())
: ResponseUtils.CreateResponse(HttpStatusCode.OK);
});
@ -977,11 +1200,5 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await connection.StartAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
private bool IsNegotiateRequest(HttpRequestMessage request)
{
return request.Method == HttpMethod.Post &&
new UriBuilder(request.RequestUri).Path.EndsWith("/negotiate");
}
}
}

View File

@ -73,10 +73,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
var hubConnection = new HubConnection(new TestConnection(), Mock.Of<IHubProtocol>(), null);
var closedEventTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e => closedEventTcs.SetResult(e);
await hubConnection.StartAsync().OrTimeout();
await hubConnection.DisposeAsync().OrTimeout();
await hubConnection.Closed.OrTimeout();
Assert.Null(await closedEventTcs.Task);
}
[Fact]
@ -182,9 +183,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await hubConnection.StartAsync();
var invokeTask = hubConnection.InvokeAsync<int>("testMethod");
await hubConnection.DisposeAsync();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await invokeTask);
var exception = new InvalidOperationException();
mockConnection.Raise(m => m.Closed += null, exception);
var actualException = await Assert.ThrowsAsync<InvalidOperationException>(async () => await invokeTask);
Assert.Equal(exception, actualException);
}
[Fact]
@ -196,8 +200,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
hubConnection.ServerTimeout = TimeSpan.FromMilliseconds(100);
await hubConnection.StartAsync().OrTimeout();
var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await hubConnection.Closed.OrTimeout());
Assert.Equal("Server timeout (100.00ms) elapsed without receiving a message from the server.", ex.Message);
var closeTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += ex => closeTcs.TrySetResult(ex);
var exception = Assert.IsType<TimeoutException>(await closeTcs.Task.OrTimeout());
Assert.Equal("Server timeout (100.00ms) elapsed without receiving a message from the server.", exception.Message);
}
// Moq really doesn't handle out parameters well, so to make these tests work I added a manual mock -anurse

View File

@ -4,6 +4,12 @@
<TargetFrameworks>$(StandardTestTfms)</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Net;
using System.Net.Http;
using System.Text;
@ -28,6 +29,12 @@ namespace Microsoft.AspNetCore.Client.Tests
};
}
public static bool IsNegotiateRequest(HttpRequestMessage request)
{
return request.Method == HttpMethod.Post &&
new UriBuilder(request.RequestUri).Path.EndsWith("/negotiate");
}
public static string CreateNegotiationResponse(string connectionId = "00000000-0000-0000-0000-000000000000",
SocketsTransportType? transportTypes = SocketsTransportType.All)
{

View File

@ -75,25 +75,23 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
.Returns<HttpRequestMessage, CancellationToken>((request, cancellationToken) =>
{
await Task.Yield();
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
{
await Task.Yield();
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!eventStreamCts.IsCancellationRequested)
{
await Task.Yield();
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!eventStreamCts.IsCancellationRequested)
{
await stream.WriteAsync(buffer, 0, buffer.Length);
}
});
await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout();
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
return Task.FromResult(new HttpResponseMessage { Content = new StreamContent(mockStream.Object) });
});
Task transportActiveTask;

View File

@ -30,14 +30,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private Task _receiveLoop;
private TransferMode? _transferMode;
private readonly TaskCompletionSource<object> _closeTcs = new TaskCompletionSource<object>();
public Task Closed => _closeTcs.Task;
public event Action<Exception> Closed;
public Task Started => _started.Task;
public Task Disposed => _disposed.Task;
public ChannelReader<byte[]> SentMessages => _sentMessages.Reader;
public ChannelWriter<byte[]> ReceivedMessages => _receivedMessages.Writer;
private bool _closed;
private object _closedLock = new object();
private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
public IFeatureCollection Features { get; } = new FeatureCollection();
@ -51,19 +53,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public Task AbortAsync(Exception ex) => DisposeCoreAsync(ex);
public Task DisposeAsync() => DisposeCoreAsync();
// TestConnection isn't restartable
public Task StopAsync() => DisposeAsync();
private Task DisposeCoreAsync(Exception ex = null)
{
if (ex == null)
{
_closeTcs.TrySetResult(null);
_disposed.TrySetResult(null);
}
else
{
_closeTcs.TrySetException(ex);
_disposed.TrySetException(ex);
}
TriggerClosed(ex);
_receiveShutdownToken.Cancel();
return _receiveLoop;
}
@ -147,16 +142,28 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
}
}
_closeTcs.TrySetResult(null);
TriggerClosed();
}
catch (OperationCanceledException)
{
// Do nothing, we were just asked to shut down.
_closeTcs.TrySetResult(null);
TriggerClosed();
}
catch (Exception ex)
{
_closeTcs.TrySetException(ex);
TriggerClosed(ex);
}
}
private void TriggerClosed(Exception ex = null)
{
lock (_closedLock)
{
if (!_closed)
{
_closed = true;
Closed?.Invoke(ex);
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Client.Tests;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
public class TestHttpMessageHandler : HttpMessageHandler
{
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (ResponseUtils.IsNegotiateRequest(request))
{
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.OK,
ResponseUtils.CreateNegotiationResponse()));
}
else
{
return Task.FromException<HttpResponseMessage>(new InvalidOperationException($"Http endpoint not implemented: {request.RequestUri}"));
}
}
}
}

View File

@ -4,6 +4,12 @@
<TargetFrameworks>$(StandardTestTfms)</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Common\Microsoft.AspNetCore.SignalR.Common.csproj" />
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />

View File

@ -4,6 +4,12 @@
<TargetFrameworks>$(StandardTestTfms)</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.Sockets\Microsoft.AspNetCore.Sockets.csproj" />

View File

@ -164,6 +164,19 @@ namespace Microsoft.AspNetCore.SignalR.Tests
new TransferModeFeature { TransferMode = requestedTransferMode });
try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e =>
{
if (e != null)
{
closeTcs.SetException(e);
}
else
{
closeTcs.SetResult(null);
}
};
var receiveTcs = new TaskCompletionSource<string>();
connection.OnReceived((data, state) =>
{
@ -208,7 +221,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
logger.LogInformation("Receiving message");
Assert.Equal(message, await receiveTcs.Task.OrTimeout());
logger.LogInformation("Completed receive");
await connection.Closed.OrTimeout();
await closeTcs.Task.OrTimeout();
}
catch (Exception ex)
{
@ -325,11 +338,22 @@ namespace Microsoft.AspNetCore.SignalR.Tests
try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e =>
{
if (e != null)
{
closeTcs.SetException(e);
}
else
{
closeTcs.SetResult(null);
}
};
logger.LogInformation("Starting connection to {url}", url);
await connection.StartAsync().OrTimeout();
await connection.Closed.OrTimeout();
await closeTcs.Task.OrTimeout();
}
catch (Exception ex)
{

View File

@ -15,6 +15,12 @@
<PackageReference Include="Microsoft.CSharp" Version="$(MicrosoftCSharpPackageVersion)" />
</ItemGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />

View File

@ -5,6 +5,12 @@
<RuntimeIdentifier Condition="'$(TargetFramework)' == 'net461'">win7-x86</RuntimeIdentifier>
</PropertyGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.Sockets.Http\Microsoft.AspNetCore.Sockets.Http.csproj" />
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />

4
test/xunit.runner.json Normal file
View File

@ -0,0 +1,4 @@
{
"longRunningTestSeconds": 5,
"diagnosticMessages": false
}