Removing Dispose from Transport, Connection and HubConnection

Converting StopAsync to DisposeAsync
This commit is contained in:
moozzyk 2017-02-16 14:15:47 -08:00
parent 76bd114a2f
commit 7a4746868a
12 changed files with 372 additions and 335 deletions

View File

@ -30,7 +30,8 @@ namespace ClientSample
{
logger.LogInformation("Connecting to {0}", baseUrl);
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new HubConnection(new Uri(baseUrl), new JsonNetInvocationAdapter(), loggerFactory))
var connection = new HubConnection(new Uri(baseUrl), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
logger.LogInformation("Connected to {0}", baseUrl);
@ -58,6 +59,10 @@ namespace ClientSample
await connection.Invoke<object>("Send", line);
}
}
finally
{
await connection.DisposeAsync();
}
}
}
}

View File

@ -30,7 +30,8 @@ namespace ClientSample
{
logger.LogInformation("Connecting to {0}", baseUrl);
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new Connection(new Uri(baseUrl), loggerFactory))
var connection = new Connection(new Uri(baseUrl), loggerFactory);
try
{
var cts = new CancellationTokenSource();
connection.Received += (data, format) => logger.LogInformation($"Received: {Encoding.UTF8.GetString(data)}");
@ -48,8 +49,10 @@ namespace ClientSample
};
await StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel());
await connection.StopAsync();
}
finally
{
await connection.DisposeAsync();
}
}
}

View File

@ -16,7 +16,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.SignalR.Client
{
public class HubConnection : IDisposable
public class HubConnection
{
private readonly ILogger _logger;
private readonly Connection _connection;
@ -71,14 +71,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
await _connection.StartAsync(transport, httpClient);
}
public async Task StopAsync()
public async Task DisposeAsync()
{
await _connection.StopAsync();
}
public void Dispose()
{
_connection.Dispose();
await _connection.DisposeAsync();
}
// TODO: Client return values/tasks?

View File

@ -12,7 +12,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Sockets.Client
{
public class Connection : IDisposable
public class Connection
{
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
@ -212,7 +212,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
return false;
}
public async Task StopAsync()
public async Task DisposeAsync()
{
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
@ -232,21 +232,6 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
}
public void Dispose()
{
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
if (_transportChannel != null)
{
Output.TryComplete();
}
if (_transport != null)
{
_transport.Dispose();
}
}
private class ConnectionState
{
public const int Initial = 0;

View File

@ -6,7 +6,7 @@ using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Sockets.Client
{
public interface ITransport : IDisposable
public interface ITransport
{
Task StartAsync(Uri url, IChannelConnection<Message> application);
Task StopAsync();

View File

@ -42,7 +42,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
_poller = Poll(Utils.AppendPath(url, "poll"), _transportCts.Token);
_sender = SendMessages(Utils.AppendPath(url, "send"), _transportCts.Token);
Running = Task.WhenAll(_sender, _poller).ContinueWith(t => {
Running = Task.WhenAll(_sender, _poller).ContinueWith(t =>
{
_application.Output.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
@ -53,12 +54,14 @@ namespace Microsoft.AspNetCore.Sockets.Client
public async Task StopAsync()
{
_transportCts.Cancel();
await Running;
}
public void Dispose()
{
_transportCts.Cancel();
try
{
await Running;
}
catch
{
// exceptions have been handled in the Running task continuation by closing the channel with the exception
}
}
private async Task Poll(Uri pollUrl, CancellationToken cancellationToken)

View File

@ -42,8 +42,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
throw new ArgumentNullException(nameof(application));
}
_application = application;
await Connect(url);
var sendTask = SendMessages(url, _cancellationToken);
var receiveTask = ReceiveMessages(url, _cancellationToken);
@ -154,14 +155,19 @@ namespace Microsoft.AspNetCore.Sockets.Client
await _webSocket.ConnectAsync(uriBuilder.Uri, _cancellationToken);
}
public void Dispose()
{
_webSocket.Dispose();
}
public async Task StopAsync()
{
await _webSocket.CloseAsync(WebSocketCloseStatus.Empty, null, _cancellationToken);
_webSocket.Dispose();
try
{
await Running;
}
catch
{
// exceptions have been handled in the Running task continuation by closing the channel with the exception
}
}
}
}

View File

@ -51,7 +51,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory))
var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
@ -59,6 +60,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
Assert.Equal("Hello World!", result);
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -71,7 +76,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory))
var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
@ -79,6 +85,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
Assert.Equal(originalMessage, result);
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -91,7 +101,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory))
var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
@ -99,6 +110,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
Assert.Equal(originalMessage, result);
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -111,7 +126,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory))
var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
@ -125,6 +141,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
Assert.Equal(originalMessage, await tcs.Task.OrTimeout());
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -136,7 +156,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
using (var httpClient = _testServer.CreateClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory))
var connection = new HubConnection(new Uri("http://test/hubs"), new JsonNetInvocationAdapter(), loggerFactory);
try
{
await connection.StartAsync(transport, httpClient);
@ -145,6 +166,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
Assert.Equal(ex.Message, "Unknown hub method '!@#$%'");
}
finally
{
await connection.DisposeAsync();
}
}
}

View File

@ -64,11 +64,12 @@ namespace Microsoft.AspNetCore.SignalR.Tests
using (var httpClient = new HttpClient())
{
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory))
var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory);
try
{
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
connection.Closed += e =>
{
if (e != null)
{
@ -87,8 +88,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var receiveData = new ReceiveData();
Assert.Equal(message, await receiveTcs.Task.OrTimeout());
await connection.StopAsync();
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -111,7 +114,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var loggerFactory = new LoggerFactory();
var transport = new WebSocketsTransport();
using (var connection = new ClientConnection(new Uri(baseUrl + "/echo/ws"), loggerFactory))
var connection = new ClientConnection(new Uri(baseUrl + "/echo/ws"), loggerFactory);
try
{
var receiveTcs = new TaskCompletionSource<byte[]>();
connection.Received += (data, messageType) => receiveTcs.SetResult(data);
@ -124,8 +128,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var receivedData = await receiveTcs.Task.OrTimeout();
Assert.Equal(message, Encoding.UTF8.GetString(receivedData));
await connection.StopAsync();
}
finally
{
await connection.DisposeAsync();
}
}
}

View File

@ -31,12 +31,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
Assert.Equal(connectionUrl, new Connection(connectionUrl).Url);
}
[Fact]
public void CanDisposeNotStartedConnection()
{
using (new Connection(new Uri("http://fakeuri.org"))) { }
}
[Fact]
public async Task CannotStartRunningConnection()
{
@ -50,16 +44,21 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
await connection.StartAsync(longPollingTransport, httpClient);
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
await connection.StopAsync();
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
await connection.StartAsync(longPollingTransport, httpClient);
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -76,11 +75,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
await connection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
@ -93,10 +93,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
public async Task CannotStartDisposedConnection()
{
using (var httpClient = new HttpClient())
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
connection.Dispose();
await connection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(longPollingTransport));
@ -108,14 +108,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
[Fact]
public async Task SendReturnsFalseIfConnectionIsNotStarted()
{
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary));
}
var connection = new Connection(new Uri("http://fakeuri.org/"));
Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary));
}
[Fact]
public async Task SendReturnsFalseIfConnectionIsStopped()
public async Task SendReturnsFalseIfConnectionIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
@ -127,24 +125,17 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
await connection.DisposeAsync();
Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary));
}
}
[Fact]
public async Task SendReturnsFalseIfConnectionIsDisposed()
{
var connection = new Connection(new Uri("http://fakeuri.org/"));
connection.Dispose();
Assert.False(await connection.SendAsync(new byte[0], MessageType.Binary));
}
[Fact]
public async Task ConnectedEventRaisedWhenTheClientIsConnected()
{
@ -158,15 +149,23 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var connectedEventRaised = false;
connection.Connected += () => connectedEventRaised = true;
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var connectedEventRaised = false;
connection.Connected += () => connectedEventRaised = true;
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(longPollingTransport, httpClient);
Assert.True(connectedEventRaised);
}
finally
{
await connection.DisposeAsync();
}
Assert.True(connectedEventRaised);
}
}
@ -187,13 +186,21 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
.Returns(Task.FromException(new InvalidOperationException("Transport failed to start")));
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var connection = new Connection(new Uri("http://fakeuri.org/"));
var connectedEventRaised = false;
connection.Connected += () => connectedEventRaised = true;
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(mockTransport.Object, httpClient));
try
{
connection.Connected += () => connectedEventRaised = true;
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.StartAsync(mockTransport.Object, httpClient));
}
finally
{
await connection.DisposeAsync();
}
Assert.False(connectedEventRaised);
}
@ -212,47 +219,18 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.SetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
await connection.DisposeAsync();
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
// in case of clean disconnect error should be null
Assert.Null(await closedEventTcs.Task);
}
}
[Fact]
public async Task ClosedEventRaisedWhenTheClientIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connection = new Connection(new Uri("http://fakeuri.org/"));
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.TrySetResult(e);
using (connection)
{
await connection.StartAsync(longPollingTransport, httpClient);
}
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
Assert.Null(await closedEventTcs.Task);
Assert.Null(await closedEventTcs.Task.OrTimeout());
}
}
@ -273,29 +251,34 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
var closedEventTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closedEventTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
Assert.IsType<HttpRequestException>(await closedEventTcs.Task);
try
{
await connection.StartAsync(longPollingTransport, httpClient);
Assert.IsType<HttpRequestException>(await closedEventTcs.Task.OrTimeout());
}
finally
{
await connection.DisposeAsync();
}
}
}
[Fact]
public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted()
{
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
bool closedEventRaised = false;
connection.Closed += e => closedEventRaised = true;
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StopAsync();
Assert.False(closedEventRaised);
}
bool closedEventRaised = false;
connection.Closed += e => closedEventRaised = true;
await connection.DisposeAsync();
Assert.False(closedEventRaised);
}
[Fact]
@ -311,40 +294,20 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
await connection.StartAsync(longPollingTransport, httpClient);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
Assert.False(longPollingTransport.Running.IsCompleted);
await connection.StopAsync();
await longPollingTransport.Running.OrTimeout();
}
}
[Fact]
public async Task TransportIsClosedWhenConnectionIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
try
{
await connection.StartAsync(longPollingTransport, httpClient);
Assert.False(longPollingTransport.Running.IsCompleted);
}
finally
{
await connection.DisposeAsync();
}
await longPollingTransport.Running.OrTimeout();
}
@ -368,17 +331,23 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
await connection.StartAsync(longPollingTransport, httpClient);
var data = new byte[] { 1, 1, 2, 3, 5, 8 };
await connection.SendAsync(data, MessageType.Binary);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
await connection.StartAsync(longPollingTransport, httpClient);
Assert.Equal(data, await sendTcs.Task.OrTimeout());
var data = new byte[] { 1, 1, 2, 3, 5, 8 };
await connection.SendAsync(data, MessageType.Binary);
await connection.StopAsync();
Assert.Equal(data, await sendTcs.Task.OrTimeout());
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -401,29 +370,34 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
{
if (e != null)
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
{
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetCanceled();
}
};
if (e != null)
{
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetCanceled();
}
};
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(longPollingTransport, httpClient);
// TODO: timeout
Assert.Equal("42", await receiveTcs.Task);
await connection.StopAsync();
Assert.Equal("42", await receiveTcs.Task.OrTimeout());
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -440,11 +414,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
await connection.DisposeAsync();
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
}
@ -466,18 +441,25 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(longPollingTransport, httpClient);
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
finally
{
await connection.DisposeAsync();
}
}
}
@ -498,18 +480,25 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StartAsync(longPollingTransport, httpClient);
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
finally
{
await connection.DisposeAsync();
}
}
}
}

View File

@ -1,4 +1,4 @@
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Client;
using Microsoft.Extensions.Logging;
using Moq;
@ -23,19 +23,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
[Fact]
public void CanDisposeNotStartedHubConnection()
public async Task CanDisposeNotStartedHubConnection()
{
using (new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>()))
{ }
}
[Fact]
public async Task CanStopNotStartedHubConnection()
{
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>()))
{
await hubConnection.StopAsync();
}
await new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>())
.DisposeAsync();
}
[Fact]
@ -51,16 +42,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org/"), Mock.Of<IInvocationAdapter>(), new LoggerFactory()))
{
await hubConnection.StartAsync(longPollingTransport, httpClient);
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.StartAsync(longPollingTransport));
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org/"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
await hubConnection.StopAsync();
try
{
await hubConnection.StartAsync(longPollingTransport, httpClient);
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.StartAsync(longPollingTransport));
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
finally
{
await hubConnection.DisposeAsync();
}
}
}
@ -77,27 +74,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org/"), Mock.Of<IInvocationAdapter>(), new LoggerFactory()))
{
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StopAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.StartAsync(longPollingTransport));
Assert.Equal("Cannot start a connection that is not in the Initial state.", exception.Message);
}
}
[Fact]
public async Task CannotStartDisposedHubConnection()
{
using (var httpClient = new HttpClient())
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org/"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
hubConnection.Dispose();
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(
async () => await hubConnection.StartAsync(longPollingTransport));
@ -109,27 +91,24 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact(Skip = "Not implemented")]
public async Task InvokeThrowsIfHubConnectionNotStarted()
{
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>()))
{
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(async () => await hubConnection.Invoke<int>("test"));
Assert.Equal("Cannot invoke methods on non-started connections.", exception.Message);
}
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>());
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(async () => await hubConnection.Invoke<int>("test"));
Assert.Equal("Cannot invoke methods on non-started connections.", exception.Message);
}
[Fact(Skip = "Not implemented")]
public async Task InvokeThrowsIfHubConnectionDisposed()
{
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>()))
{
hubConnection.Dispose();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(async () => await hubConnection.Invoke<int>("test"));
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>());
await hubConnection.DisposeAsync();
var exception =
await Assert.ThrowsAsync<InvalidOperationException>(async () => await hubConnection.Invoke<int>("test"));
Assert.Equal("Cannot invoke methods on disposed connections.", exception.Message);
}
}
// TODO: If HubConnection takes (I)Connection we could just tests if events are wired up
// TODO: If HubConnection takes (I)Connection we could just tests if events are wired up
[Fact]
public async Task HubConnectionConnectedEventRaisedWhenTheClientIsConnected()
@ -144,15 +123,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory()))
{
var connectedEventRaised = false;
hubConnection.Connected += () => connectedEventRaised = true;
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
try
{
var connectedEventRaised = false;
hubConnection.Connected += () => connectedEventRaised = true;
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StartAsync(longPollingTransport, httpClient);
Assert.True(connectedEventRaised);
Assert.True(connectedEventRaised);
}
finally
{
await hubConnection.DisposeAsync();
}
}
}
@ -169,19 +155,16 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory()))
{
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
var closedEventTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e => closedEventTcs.SetResult(e);
await hubConnection.StartAsync(longPollingTransport, httpClient);
await hubConnection.StopAsync();
await hubConnection.DisposeAsync();
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
// in case of clean disconnect error should be null
Assert.Null(await closedEventTcs.Task);
Assert.Null(await closedEventTcs.Task.OrTimeout());
}
}
}

View File

@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
public class LongPollingTransportTests
{
[Fact]
public async Task LongPollingTransportStopsPollAndSendLoopsWhenTransportDisposed()
public async Task LongPollingTransportStopsPollAndSendLoopsWhenTransportStopped()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
@ -33,19 +33,27 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
Task transportActiveTask;
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
transportActiveTask = longPollingTransport.Running;
try
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
Assert.False(transportActiveTask.IsCompleted);
transportActiveTask = longPollingTransport.Running;
Assert.False(transportActiveTask.IsCompleted);
}
finally
{
await longPollingTransport.StopAsync();
}
await transportActiveTask.OrTimeout();
}
await transportActiveTask.OrTimeout();
}
[Fact]
@ -61,15 +69,23 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
await longPollingTransport.Running.OrTimeout();
Assert.True(transportToConnection.In.Completion.IsCompleted);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
await longPollingTransport.Running.OrTimeout();
Assert.True(transportToConnection.In.Completion.IsCompleted);
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
@ -86,16 +102,23 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
var exception =
await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion.OrTimeout());
Assert.Contains(" 500 ", exception.Message);
var exception =
await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion.OrTimeout());
Assert.Contains(" 500 ", exception.Message);
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
@ -115,25 +138,32 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
await connectionToTransport.Out.WriteAsync(new Message());
await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Running.OrTimeout());
// The channel needs to be drained for the Completion task to be completed
while (transportToConnection.In.TryRead(out Message message))
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
message.Dispose();
}
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion);
Assert.Contains(" 500 ", exception.Message);
await connectionToTransport.Out.WriteAsync(new Message());
await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Running.OrTimeout());
// The channel needs to be drained for the Completion task to be completed
while (transportToConnection.In.TryRead(out Message message))
{
message.Dispose();
}
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion);
Assert.Contains(" 500 ", exception.Message);
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
@ -150,19 +180,26 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory());
try
{
var connectionToTransport = Channel.CreateUnbounded<Message>();
var transportToConnection = Channel.CreateUnbounded<Message>();
var channelConnection = new ChannelConnection<Message>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection);
connectionToTransport.Out.Complete();
connectionToTransport.Out.Complete();
await longPollingTransport.Running.OrTimeout();
await longPollingTransport.Running.OrTimeout();
await longPollingTransport.Running.OrTimeout();
await connectionToTransport.In.Completion.OrTimeout();
await longPollingTransport.Running.OrTimeout();
await connectionToTransport.In.Completion.OrTimeout();
}
finally
{
await longPollingTransport.StopAsync();
}
}
}
}