Raising Received event

This commit is contained in:
moozzyk 2017-02-09 10:50:37 -08:00
parent 4f6306f352
commit d4fbdd055a
5 changed files with 99 additions and 153 deletions

View File

@ -32,11 +32,14 @@ namespace ClientSample
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new Connection(new Uri(baseUrl), loggerFactory))
{
var cts = new CancellationTokenSource();
connection.Received += (data, format) => logger.LogInformation($"Received: {Encoding.UTF8.GetString(data)}");
connection.Closed += e => cts.Cancel();
await connection.StartAsync(transport, httpClient);
logger.LogInformation("Connected to {0}", baseUrl);
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, a) =>
{
a.Cancel = true;
@ -44,13 +47,8 @@ namespace ClientSample
cts.Cancel();
};
// Ready to start the loops
var receive =
StartReceiving(loggerFactory.CreateLogger("ReceiveLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel());
var send =
StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel());
await StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel());
await Task.WhenAll(receive, send);
await connection.StopAsync();
}
}
@ -68,28 +66,5 @@ namespace ClientSample
}
logger.LogInformation("Send loop terminated");
}
private static async Task StartReceiving(ILogger logger, Connection connection, CancellationToken cancellationToken)
{
logger.LogInformation("Receive loop starting");
try
{
var receiveData = new ReceiveData();
while (await connection.ReceiveAsync(receiveData, cancellationToken))
{
logger.LogInformation($"Received: {Encoding.UTF8.GetString(receiveData.Data)}");
}
}
catch (OperationCanceledException)
{
logger.LogInformation("Connection is closing");
}
catch (Exception ex)
{
logger.LogError(0, ex, "Connection terminated due to an exception");
}
logger.LogInformation("Receive loop terminated");
}
}
}

View File

@ -18,13 +18,11 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public class HubConnection : IDisposable
{
private readonly Task _reader;
private readonly ILogger _logger;
private readonly Connection _connection;
private readonly IInvocationAdapter _adapter;
private readonly HubBinder _binder;
private readonly CancellationTokenSource _readerCts = new CancellationTokenSource();
private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource();
// We need to ensure pending calls added after a connection failure don't hang. Right now the easiest thing to do is lock.
@ -42,7 +40,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
_adapter = adapter;
_logger = logger;
_reader = ReceiveMessages(_readerCts.Token);
// TODO HIGH: Need to subscribe to events before starting connection. Requires converting HubConnection
_connection.Received += OnDataReceived;
}
// TODO: Client return values/tasks?
@ -106,9 +105,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
return await irq.Completion.Task;
}
// TODO HIGH - need StopAsync
public void Dispose()
{
_readerCts.Cancel();
_connection.Dispose();
}
@ -124,49 +124,30 @@ namespace Microsoft.AspNetCore.SignalR.Client
return new HubConnection(connection, adapter, loggerFactory.CreateLogger<HubConnection>());
}
private async Task ReceiveMessages(CancellationToken cancellationToken)
private async void OnDataReceived(byte[] data, MessageType messageType)
{
await Task.Yield();
var message
= await _adapter.ReadMessageAsync(new MemoryStream(data), _binder, _connectionActive.Token);
_logger.LogTrace("Beginning receive loop");
try
switch (message)
{
ReceiveData receiveData = new ReceiveData();
while (await _connection.ReceiveAsync(receiveData, cancellationToken))
{
var message
= await _adapter.ReadMessageAsync(new MemoryStream(receiveData.Data), _binder, cancellationToken);
switch (message)
case InvocationDescriptor invocationDescriptor:
DispatchInvocation(invocationDescriptor, _connectionActive.Token);
break;
case InvocationResultDescriptor invocationResultDescriptor:
InvocationRequest irq;
lock (_pendingCallsLock)
{
case InvocationDescriptor invocationDescriptor:
DispatchInvocation(invocationDescriptor, cancellationToken);
break;
case InvocationResultDescriptor invocationResultDescriptor:
InvocationRequest irq;
lock (_pendingCallsLock)
{
_connectionActive.Token.ThrowIfCancellationRequested();
irq = _pendingCalls[invocationResultDescriptor.Id];
_pendingCalls.Remove(invocationResultDescriptor.Id);
}
DispatchInvocationResult(invocationResultDescriptor, irq, cancellationToken);
break;
_connectionActive.Token.ThrowIfCancellationRequested();
irq = _pendingCalls[invocationResultDescriptor.Id];
_pendingCalls.Remove(invocationResultDescriptor.Id);
}
}
Shutdown();
}
catch (Exception ex)
{
Shutdown(ex);
throw;
}
finally
{
_logger.LogTrace("Ending receive loop");
DispatchInvocationResult(invocationResultDescriptor, irq, _connectionActive.Token);
break;
}
}
// TODO HIGH!
private void Shutdown(Exception ex = null)
{
_logger.LogTrace("Shutting down connection");

View File

@ -19,6 +19,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
private int _connectionState = ConnectionState.Initial;
private IChannelConnection<Message> _transportChannel;
private ITransport _transport;
private Task _receiveLoopTask;
private ReadableChannel<Message> Input => _transportChannel.Input;
private WritableChannel<Message> Output => _transportChannel.Output;
@ -26,7 +27,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
public Uri Url { get; }
public event Action Connected;
public event Action<byte[], Format> Received;
public event Action<byte[], MessageType> Received;
public event Action<Exception> Closed;
public Connection(Uri url)
@ -67,6 +68,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
throw;
}
// start receive loop
_receiveLoopTask = ReceiveAsync();
Interlocked.Exchange(ref _connectionState, ConnectionState.Connected);
// Do not "simplify" - events can be removed from a different thread
@ -146,52 +150,36 @@ namespace Microsoft.AspNetCore.Sockets.Client
}
}
public Task<bool> ReceiveAsync(ReceiveData receiveData)
private async Task ReceiveAsync()
{
return ReceiveAsync(receiveData, CancellationToken.None);
}
public async Task<bool> ReceiveAsync(ReceiveData receiveData, CancellationToken cancellationToken)
{
if (receiveData == null)
{
throw new ArgumentNullException(nameof(receiveData));
}
if (Input.Completion.IsCompleted)
{
throw new InvalidOperationException("Cannot receive messages when the connection is stopped.");
}
try
{
while (await Input.WaitToReadAsync(cancellationToken))
_logger.LogTrace("Beginning receive loop");
while (await Input.WaitToReadAsync())
{
if (Input.TryRead(out Message message))
{
using (message)
{
receiveData.MessageType = message.Type;
receiveData.Data = message.Payload.Buffer.ToArray();
return true;
var receivedEventHandler = Received;
if (receivedEventHandler != null)
{
receivedEventHandler(message.Payload.Buffer.ToArray(), message.Type);
}
}
}
}
await Input.Completion;
}
catch (OperationCanceledException)
{
// channel is being closed
}
catch (Exception ex)
{
Output.TryComplete(ex);
_logger.LogError("Error receiving message: {0}", ex);
throw;
}
return false;
_logger.LogTrace("Ending receive loop");
}
public Task<bool> SendAsync(byte[] data, MessageType type)
@ -201,7 +189,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
public async Task<bool> SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken)
{
// TODO: data == null?
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (_connectionState != ConnectionState.Connected)
{
return false;
@ -233,6 +225,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
await _transport.StopAsync();
}
if (_receiveLoopTask != null)
{
await _receiveLoopTask;
}
}
public void Dispose()

View File

@ -66,14 +66,27 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory))
{
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
{
if (e != null)
{
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetResult(null);
}
};
await connection.StartAsync(transport, httpClient);
await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text);
var receiveData = new ReceiveData();
Assert.True(await connection.ReceiveAsync(receiveData).OrTimeout());
Assert.Equal(message, Encoding.UTF8.GetString(receiveData.Data));
Assert.Equal(message, await receiveTcs.Task.OrTimeout());
await connection.StopAsync();
}
@ -100,14 +113,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var transport = new WebSocketsTransport();
using (var connection = new ClientConnection(new Uri(baseUrl + "/echo/ws"), loggerFactory))
{
var receiveTcs = new TaskCompletionSource<byte[]>();
connection.Received += (data, messageType) => receiveTcs.SetResult(data);
await connection.StartAsync(transport);
await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text);
var receiveData = new ReceiveData();
Assert.True(await connection.ReceiveAsync(receiveData).OrTimeout());
Assert.Equal(message, Encoding.UTF8.GetString(receiveData.Data));
var receivedData = await receiveTcs.Task.OrTimeout();
Assert.Equal(message, Encoding.UTF8.GetString(receivedData));
await connection.StopAsync();
}

View File

@ -199,7 +199,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
}
[Fact(Skip = "Need draining to make it work. Receive event may fix that.")]
[Fact]
public async Task ClosedEventRaisedWhenTheClientIsStopped()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
@ -221,14 +221,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
// in case of clean disconnect error should be null
Assert.Null(await closedEventTcs.Task);
}
}
[Fact(Skip = "Need draining to make it work. Receive event may fix that.")]
[Fact]
public async Task ClosedEventRaisedWhenTheClientIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
@ -260,7 +259,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
[Fact]
public async Task ClosedEventRaisedWhenConnectionToServerLost()
{
var allowPollTcs = new TaskCompletionSource<object>();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
@ -269,7 +267,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await Task.Yield();
if (request.RequestUri.AbsolutePath.EndsWith("/poll"))
{
await allowPollTcs.Task;
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) };
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
@ -283,10 +280,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
connection.Closed += e => closedEventTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
var receiveTask = connection.ReceiveAsync(new ReceiveData());
allowPollTcs.TrySetResult(null);
await Assert.ThrowsAsync<HttpRequestException>(async () => await receiveTask);
Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task));
Assert.IsType<HttpRequestException>(await closedEventTcs.Task);
}
@ -411,11 +404,24 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var receiveTcs = new TaskCompletionSource<string>();
connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data));
connection.Closed += e =>
{
if (e != null)
{
receiveTcs.TrySetException(e);
}
else
{
receiveTcs.TrySetCanceled();
}
};
await connection.StartAsync(longPollingTransport, httpClient);
var receiveData = new ReceiveData();
Assert.True(await connection.ReceiveAsync(receiveData));
Assert.Equal("42", Encoding.UTF8.GetString(receiveData.Data));
// TODO: timeout
Assert.Equal("42", await receiveTcs.Task);
await connection.StopAsync();
}
@ -443,36 +449,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
}
[Fact]
public async Task CannotReceiveAfterConnectionIsStopped()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
await connection.StartAsync(longPollingTransport, httpClient);
await connection.StopAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.ReceiveAsync(new ReceiveData()));
Assert.Equal("Cannot receive messages when the connection is stopped.", exception.Message);
}
}
[Fact]
public async Task CannotSendAfterReceiveThrewException()
{
var allowPollTcs = new TaskCompletionSource<object>();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
@ -481,7 +460,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await Task.Yield();
if (request.RequestUri.AbsolutePath.EndsWith("/poll"))
{
await allowPollTcs.Task;
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) };
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
@ -491,11 +469,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
var receiveTask = connection.ReceiveAsync(new ReceiveData());
allowPollTcs.TrySetResult(null);
await Assert.ThrowsAsync<HttpRequestException>(async () => await receiveTask);
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
@ -504,7 +484,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
[Fact]
public async Task CannotReceiveAfterReceiveThrewException()
{
var allowPollTcs = new TaskCompletionSource<object>();
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
@ -513,7 +492,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
await Task.Yield();
if (request.RequestUri.AbsolutePath.EndsWith("/poll"))
{
await allowPollTcs.Task;
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) };
}
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
@ -523,16 +501,15 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory()))
using (var connection = new Connection(new Uri("http://fakeuri.org/")))
{
var closeTcs = new TaskCompletionSource<Exception>();
connection.Closed += e => closeTcs.TrySetResult(e);
await connection.StartAsync(longPollingTransport, httpClient);
var receiveTask = connection.ReceiveAsync(new ReceiveData());
allowPollTcs.TrySetResult(null);
await Assert.ThrowsAsync<HttpRequestException>(async () => await receiveTask);
// Exception in send should shutdown the connection
await closeTcs.Task.OrTimeout();
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
async () => await connection.ReceiveAsync(new ReceiveData()));
Assert.Equal("Cannot receive messages when the connection is stopped.", exception.Message);
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
}
}