From d4fbdd055a7c31dbada5316006856dfae9ae6d58 Mon Sep 17 00:00:00 2001 From: moozzyk Date: Thu, 9 Feb 2017 10:50:37 -0800 Subject: [PATCH] Raising Received event --- samples/ClientSample/RawSample.cs | 35 ++------ .../HubConnection.cs | 59 +++++--------- .../Connection.cs | 53 ++++++------ .../EndToEndTests.cs | 24 +++++- .../ConnectionTests.cs | 81 +++++++------------ 5 files changed, 99 insertions(+), 153 deletions(-) diff --git a/samples/ClientSample/RawSample.cs b/samples/ClientSample/RawSample.cs index 84791b4f21..2db570d9a0 100644 --- a/samples/ClientSample/RawSample.cs +++ b/samples/ClientSample/RawSample.cs @@ -32,11 +32,14 @@ namespace ClientSample var transport = new LongPollingTransport(httpClient, loggerFactory); using (var connection = new Connection(new Uri(baseUrl), loggerFactory)) { + var cts = new CancellationTokenSource(); + connection.Received += (data, format) => logger.LogInformation($"Received: {Encoding.UTF8.GetString(data)}"); + connection.Closed += e => cts.Cancel(); + await connection.StartAsync(transport, httpClient); logger.LogInformation("Connected to {0}", baseUrl); - var cts = new CancellationTokenSource(); Console.CancelKeyPress += (sender, a) => { a.Cancel = true; @@ -44,13 +47,8 @@ namespace ClientSample cts.Cancel(); }; - // Ready to start the loops - var receive = - StartReceiving(loggerFactory.CreateLogger("ReceiveLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel()); - var send = - StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel()); + await StartSending(loggerFactory.CreateLogger("SendLoop"), connection, cts.Token).ContinueWith(_ => cts.Cancel()); - await Task.WhenAll(receive, send); await connection.StopAsync(); } } @@ -68,28 +66,5 @@ namespace ClientSample } logger.LogInformation("Send loop terminated"); } - - private static async Task StartReceiving(ILogger logger, Connection connection, CancellationToken cancellationToken) - { - logger.LogInformation("Receive loop starting"); - try - { - var receiveData = new ReceiveData(); - while (await connection.ReceiveAsync(receiveData, cancellationToken)) - { - logger.LogInformation($"Received: {Encoding.UTF8.GetString(receiveData.Data)}"); - } - } - catch (OperationCanceledException) - { - logger.LogInformation("Connection is closing"); - } - catch (Exception ex) - { - logger.LogError(0, ex, "Connection terminated due to an exception"); - } - - logger.LogInformation("Receive loop terminated"); - } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Client/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client/HubConnection.cs index 347ac8bae6..d9a6abfa38 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client/HubConnection.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client/HubConnection.cs @@ -18,13 +18,11 @@ namespace Microsoft.AspNetCore.SignalR.Client { public class HubConnection : IDisposable { - private readonly Task _reader; private readonly ILogger _logger; private readonly Connection _connection; private readonly IInvocationAdapter _adapter; private readonly HubBinder _binder; - private readonly CancellationTokenSource _readerCts = new CancellationTokenSource(); private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource(); // We need to ensure pending calls added after a connection failure don't hang. Right now the easiest thing to do is lock. @@ -42,7 +40,8 @@ namespace Microsoft.AspNetCore.SignalR.Client _adapter = adapter; _logger = logger; - _reader = ReceiveMessages(_readerCts.Token); + // TODO HIGH: Need to subscribe to events before starting connection. Requires converting HubConnection + _connection.Received += OnDataReceived; } // TODO: Client return values/tasks? @@ -106,9 +105,10 @@ namespace Microsoft.AspNetCore.SignalR.Client return await irq.Completion.Task; } + // TODO HIGH - need StopAsync + public void Dispose() { - _readerCts.Cancel(); _connection.Dispose(); } @@ -124,49 +124,30 @@ namespace Microsoft.AspNetCore.SignalR.Client return new HubConnection(connection, adapter, loggerFactory.CreateLogger()); } - private async Task ReceiveMessages(CancellationToken cancellationToken) + private async void OnDataReceived(byte[] data, MessageType messageType) { - await Task.Yield(); + var message + = await _adapter.ReadMessageAsync(new MemoryStream(data), _binder, _connectionActive.Token); - _logger.LogTrace("Beginning receive loop"); - try + switch (message) { - ReceiveData receiveData = new ReceiveData(); - while (await _connection.ReceiveAsync(receiveData, cancellationToken)) - { - var message - = await _adapter.ReadMessageAsync(new MemoryStream(receiveData.Data), _binder, cancellationToken); - - switch (message) + case InvocationDescriptor invocationDescriptor: + DispatchInvocation(invocationDescriptor, _connectionActive.Token); + break; + case InvocationResultDescriptor invocationResultDescriptor: + InvocationRequest irq; + lock (_pendingCallsLock) { - case InvocationDescriptor invocationDescriptor: - DispatchInvocation(invocationDescriptor, cancellationToken); - break; - case InvocationResultDescriptor invocationResultDescriptor: - InvocationRequest irq; - lock (_pendingCallsLock) - { - _connectionActive.Token.ThrowIfCancellationRequested(); - irq = _pendingCalls[invocationResultDescriptor.Id]; - _pendingCalls.Remove(invocationResultDescriptor.Id); - } - DispatchInvocationResult(invocationResultDescriptor, irq, cancellationToken); - break; + _connectionActive.Token.ThrowIfCancellationRequested(); + irq = _pendingCalls[invocationResultDescriptor.Id]; + _pendingCalls.Remove(invocationResultDescriptor.Id); } - } - Shutdown(); - } - catch (Exception ex) - { - Shutdown(ex); - throw; - } - finally - { - _logger.LogTrace("Ending receive loop"); + DispatchInvocationResult(invocationResultDescriptor, irq, _connectionActive.Token); + break; } } + // TODO HIGH! private void Shutdown(Exception ex = null) { _logger.LogTrace("Shutting down connection"); diff --git a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs index 1876b4e52e..a25e11f640 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs @@ -19,6 +19,7 @@ namespace Microsoft.AspNetCore.Sockets.Client private int _connectionState = ConnectionState.Initial; private IChannelConnection _transportChannel; private ITransport _transport; + private Task _receiveLoopTask; private ReadableChannel Input => _transportChannel.Input; private WritableChannel Output => _transportChannel.Output; @@ -26,7 +27,7 @@ namespace Microsoft.AspNetCore.Sockets.Client public Uri Url { get; } public event Action Connected; - public event Action Received; + public event Action Received; public event Action Closed; public Connection(Uri url) @@ -67,6 +68,9 @@ namespace Microsoft.AspNetCore.Sockets.Client throw; } + // start receive loop + _receiveLoopTask = ReceiveAsync(); + Interlocked.Exchange(ref _connectionState, ConnectionState.Connected); // Do not "simplify" - events can be removed from a different thread @@ -146,52 +150,36 @@ namespace Microsoft.AspNetCore.Sockets.Client } } - public Task ReceiveAsync(ReceiveData receiveData) + private async Task ReceiveAsync() { - return ReceiveAsync(receiveData, CancellationToken.None); - } - - public async Task ReceiveAsync(ReceiveData receiveData, CancellationToken cancellationToken) - { - if (receiveData == null) - { - throw new ArgumentNullException(nameof(receiveData)); - } - - if (Input.Completion.IsCompleted) - { - throw new InvalidOperationException("Cannot receive messages when the connection is stopped."); - } - try { - while (await Input.WaitToReadAsync(cancellationToken)) + _logger.LogTrace("Beginning receive loop"); + + while (await Input.WaitToReadAsync()) { if (Input.TryRead(out Message message)) { using (message) { - receiveData.MessageType = message.Type; - receiveData.Data = message.Payload.Buffer.ToArray(); - return true; + var receivedEventHandler = Received; + if (receivedEventHandler != null) + { + receivedEventHandler(message.Payload.Buffer.ToArray(), message.Type); + } } } } await Input.Completion; } - catch (OperationCanceledException) - { - // channel is being closed - } catch (Exception ex) { Output.TryComplete(ex); _logger.LogError("Error receiving message: {0}", ex); - throw; } - return false; + _logger.LogTrace("Ending receive loop"); } public Task SendAsync(byte[] data, MessageType type) @@ -201,7 +189,11 @@ namespace Microsoft.AspNetCore.Sockets.Client public async Task SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken) { - // TODO: data == null? + if (data == null) + { + throw new ArgumentNullException(nameof(data)); + } + if (_connectionState != ConnectionState.Connected) { return false; @@ -233,6 +225,11 @@ namespace Microsoft.AspNetCore.Sockets.Client { await _transport.StopAsync(); } + + if (_receiveLoopTask != null) + { + await _receiveLoopTask; + } } public void Dispose() diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs index ca8ce16069..f4071ab16a 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs @@ -66,14 +66,27 @@ namespace Microsoft.AspNetCore.SignalR.Tests var transport = new LongPollingTransport(httpClient, loggerFactory); using (var connection = new ClientConnection(new Uri(baseUrl + "/echo"), loggerFactory)) { + var receiveTcs = new TaskCompletionSource(); + connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); + connection.Closed += e => + { + if (e != null) + { + receiveTcs.TrySetException(e); + } + else + { + receiveTcs.TrySetResult(null); + } + }; + await connection.StartAsync(transport, httpClient); await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text); var receiveData = new ReceiveData(); - Assert.True(await connection.ReceiveAsync(receiveData).OrTimeout()); - Assert.Equal(message, Encoding.UTF8.GetString(receiveData.Data)); + Assert.Equal(message, await receiveTcs.Task.OrTimeout()); await connection.StopAsync(); } @@ -100,14 +113,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests var transport = new WebSocketsTransport(); using (var connection = new ClientConnection(new Uri(baseUrl + "/echo/ws"), loggerFactory)) { + var receiveTcs = new TaskCompletionSource(); + connection.Received += (data, messageType) => receiveTcs.SetResult(data); + await connection.StartAsync(transport); await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text); var receiveData = new ReceiveData(); - Assert.True(await connection.ReceiveAsync(receiveData).OrTimeout()); - Assert.Equal(message, Encoding.UTF8.GetString(receiveData.Data)); + var receivedData = await receiveTcs.Task.OrTimeout(); + Assert.Equal(message, Encoding.UTF8.GetString(receivedData)); await connection.StopAsync(); } diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs index 9dc0bad723..95ead8716b 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs @@ -199,7 +199,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests } } - [Fact(Skip = "Need draining to make it work. Receive event may fix that.")] + [Fact] public async Task ClosedEventRaisedWhenTheClientIsStopped() { var mockHttpHandler = new Mock(); @@ -221,14 +221,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests await connection.StartAsync(longPollingTransport, httpClient); await connection.StopAsync(); - Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task)); // in case of clean disconnect error should be null Assert.Null(await closedEventTcs.Task); } } - [Fact(Skip = "Need draining to make it work. Receive event may fix that.")] + [Fact] public async Task ClosedEventRaisedWhenTheClientIsDisposed() { var mockHttpHandler = new Mock(); @@ -260,7 +259,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests [Fact] public async Task ClosedEventRaisedWhenConnectionToServerLost() { - var allowPollTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) @@ -269,7 +267,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests await Task.Yield(); if (request.RequestUri.AbsolutePath.EndsWith("/poll")) { - await allowPollTcs.Task; return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) }; } return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; @@ -283,10 +280,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests connection.Closed += e => closedEventTcs.TrySetResult(e); await connection.StartAsync(longPollingTransport, httpClient); - var receiveTask = connection.ReceiveAsync(new ReceiveData()); - allowPollTcs.TrySetResult(null); - await Assert.ThrowsAsync(async () => await receiveTask); - Assert.Equal(closedEventTcs.Task, await Task.WhenAny(Task.Delay(1000), closedEventTcs.Task)); Assert.IsType(await closedEventTcs.Task); } @@ -411,11 +404,24 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) using (var connection = new Connection(new Uri("http://fakeuri.org/"))) { + var receiveTcs = new TaskCompletionSource(); + connection.Received += (data, format) => receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); + connection.Closed += e => + { + if (e != null) + { + receiveTcs.TrySetException(e); + } + else + { + receiveTcs.TrySetCanceled(); + } + }; + await connection.StartAsync(longPollingTransport, httpClient); - var receiveData = new ReceiveData(); - Assert.True(await connection.ReceiveAsync(receiveData)); - Assert.Equal("42", Encoding.UTF8.GetString(receiveData.Data)); + // TODO: timeout + Assert.Equal("42", await receiveTcs.Task); await connection.StopAsync(); } @@ -443,36 +449,9 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests } } - [Fact] - public async Task CannotReceiveAfterConnectionIsStopped() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; - }); - - using (var httpClient = new HttpClient(mockHttpHandler.Object)) - using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) - using (var connection = new Connection(new Uri("http://fakeuri.org/"))) - { - await connection.StartAsync(longPollingTransport, httpClient); - - await connection.StopAsync(); - var exception = await Assert.ThrowsAsync( - async () => await connection.ReceiveAsync(new ReceiveData())); - - Assert.Equal("Cannot receive messages when the connection is stopped.", exception.Message); - } - } - [Fact] public async Task CannotSendAfterReceiveThrewException() { - var allowPollTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) @@ -481,7 +460,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests await Task.Yield(); if (request.RequestUri.AbsolutePath.EndsWith("/poll")) { - await allowPollTcs.Task; return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) }; } return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; @@ -491,11 +469,13 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) using (var connection = new Connection(new Uri("http://fakeuri.org/"))) { + var closeTcs = new TaskCompletionSource(); + connection.Closed += e => closeTcs.TrySetResult(e); + await connection.StartAsync(longPollingTransport, httpClient); - var receiveTask = connection.ReceiveAsync(new ReceiveData()); - allowPollTcs.TrySetResult(null); - await Assert.ThrowsAsync(async () => await receiveTask); + // Exception in send should shutdown the connection + await closeTcs.Task.OrTimeout(); Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary)); } @@ -504,7 +484,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests [Fact] public async Task CannotReceiveAfterReceiveThrewException() { - var allowPollTcs = new TaskCompletionSource(); var mockHttpHandler = new Mock(); mockHttpHandler.Protected() .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) @@ -513,7 +492,6 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests await Task.Yield(); if (request.RequestUri.AbsolutePath.EndsWith("/poll")) { - await allowPollTcs.Task; return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(string.Empty) }; } return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; @@ -523,16 +501,15 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests using (var longPollingTransport = new LongPollingTransport(httpClient, new LoggerFactory())) using (var connection = new Connection(new Uri("http://fakeuri.org/"))) { + var closeTcs = new TaskCompletionSource(); + connection.Closed += e => closeTcs.TrySetResult(e); + await connection.StartAsync(longPollingTransport, httpClient); - var receiveTask = connection.ReceiveAsync(new ReceiveData()); - allowPollTcs.TrySetResult(null); - await Assert.ThrowsAsync(async () => await receiveTask); + // Exception in send should shutdown the connection + await closeTcs.Task.OrTimeout(); - var exception = await Assert.ThrowsAsync( - async () => await connection.ReceiveAsync(new ReceiveData())); - - Assert.Equal("Cannot receive messages when the connection is stopped.", exception.Message); + Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary)); } } }