From 02df601715f345c0596d2f74e4fb893dfe8827a5 Mon Sep 17 00:00:00 2001 From: moozzyk Date: Fri, 3 Mar 2017 14:43:29 -0800 Subject: [PATCH] Dispatching raising events to separate threads --- .../Connection.cs | 59 +++++++--- .../ConnectionTests.cs | 104 +++++++++++++++++- .../HubConnectionTests.cs | 8 +- .../TaskQueueTests.cs | 4 +- 4 files changed, 152 insertions(+), 23 deletions(-) diff --git a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs index d02cc0bfb6..c4cd19e6ea 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client/Connection.cs @@ -22,6 +22,7 @@ namespace Microsoft.AspNetCore.Sockets.Client private volatile ITransport _transport; private volatile Task _receiveLoopTask; private volatile Task _startTask = Task.CompletedTask; + private TaskQueue _eventQueue = new TaskQueue(); private ReadableChannel Input => _transportChannel.Input; private WritableChannel Output => _transportChannel.Output; @@ -85,26 +86,36 @@ namespace Microsoft.AspNetCore.Sockets.Client if (Interlocked.CompareExchange(ref _connectionState, ConnectionState.Connected, ConnectionState.Connecting) == ConnectionState.Connecting) { - // Do not "simplify" - events can be removed from a different thread - var connectedEventHandler = Connected; - if (connectedEventHandler != null) + var ignore = _eventQueue.Enqueue(() => { - connectedEventHandler(); - } + // Do not "simplify" - events can be removed from a different thread + var connectedEventHandler = Connected; + if (connectedEventHandler != null) + { + connectedEventHandler(); + } - var ignore = Input.Completion.ContinueWith(t => + return Task.CompletedTask; + }); + + ignore = Input.Completion.ContinueWith(async t => { Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected); - // Do not "simplify" - events can be removed from a different thread + await _eventQueue.Drain(); + + // Do not "simplify" - event handlers can be removed from a different thread var closedEventHandler = Closed; if (closedEventHandler != null) { closedEventHandler(t.IsFaulted ? t.Exception.InnerException : null); } + + return Task.CompletedTask; }); - // start receive loop + // start receive loop only after the Connected event was raised to + // avoid Received event being raised before the Connected event _receiveLoopTask = ReceiveAsync(); } } @@ -169,18 +180,36 @@ namespace Microsoft.AspNetCore.Sockets.Client { try { - _logger.LogTrace("Beginning receive loop"); + _logger.LogTrace("Beginning receive loop."); while (await Input.WaitToReadAsync()) { + if (_connectionState != ConnectionState.Connected) + { + _logger.LogDebug("Message received but connection is not connected. Skipping raising Received event."); + // drain + Input.TryRead(out Message ignore); + continue; + } + if (Input.TryRead(out Message message)) { - // Do not "simplify" - events can be removed from a different thread - var receivedEventHandler = Received; - if (receivedEventHandler != null) + _logger.LogDebug("Scheduling raising Received event."); + var ignore = _eventQueue.Enqueue(() => { - receivedEventHandler(message.Payload, message.Type); - } + // Do not "simplify" - event handlers can be removed from a different thread + var receivedEventHandler = Received; + if (receivedEventHandler != null) + { + receivedEventHandler(message.Payload, message.Type); + } + + return Task.CompletedTask; + }); + } + else + { + _logger.LogDebug("Could not read message."); } } @@ -232,6 +261,8 @@ namespace Microsoft.AspNetCore.Sockets.Client public async Task DisposeAsync() { + _logger.LogInformation("Stopping client."); + Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected); try { diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs index b94257359b..a20512b6c4 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/ConnectionTests.cs @@ -201,12 +201,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests var connection = new Connection(new Uri("http://fakeuri.org/")); try { - var connectedEventRaised = false; - connection.Connected += () => connectedEventRaised = true; + var connectedEventRaisedTcs = new TaskCompletionSource(); + connection.Connected += () => connectedEventRaisedTcs.SetResult(null); await connection.StartAsync(longPollingTransport, httpClient); - Assert.True(connectedEventRaised); + await connectedEventRaisedTcs.Task.OrTimeout(); } finally { @@ -315,6 +315,104 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests } } + [Fact] + public async Task ReceivedEventNotRaisedAfterConnectionIsDisposed() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + var mockTransport = new Mock(); + IChannelConnection channel = null; + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + .Returns>((url, c) => + { + channel = c; + return Task.CompletedTask; + }); + mockTransport.Setup(t => t.StopAsync()) + .Returns(() => + { + // The connection is now in the Disconnected state so the Received event for + // this message should not be raised + channel.Output.TryWrite(new Message()); + channel.Output.TryComplete(); + return Task.CompletedTask; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + { + var connection = new Connection(new Uri("http://fakeuri.org/")); + var receivedInvoked = false; + connection.Received += (m, t) => receivedInvoked = true; + + await connection.StartAsync(mockTransport.Object, httpClient); + await connection.DisposeAsync(); + Assert.False(receivedInvoked); + } + } + + [Fact] + public async Task EventsAreNotRunningOnMainLoop() + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) }; + }); + + var mockTransport = new Mock(); + IChannelConnection channel = null; + mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>())) + .Returns>((url, c) => + { + channel = c; + return Task.CompletedTask; + }); + mockTransport.Setup(t => t.StopAsync()) + .Returns(() => + { + channel.Output.TryComplete(); + return Task.CompletedTask; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + { + var closedTcs = new TaskCompletionSource(); + var allowDisposeTcs = new TaskCompletionSource(); + int receivedInvocationCount = 0; + + var connection = new Connection(new Uri("http://fakeuri.org/")); + connection.Received += + async (m, t) => + { + if (Interlocked.Increment(ref receivedInvocationCount) == 2) + { + allowDisposeTcs.TrySetResult(null); + } + await closedTcs.Task; + }; + connection.Closed += e => closedTcs.SetResult(null); + + await connection.StartAsync(mockTransport.Object, httpClient); + channel.Output.TryWrite(new Message()); + channel.Output.TryWrite(new Message()); + await allowDisposeTcs.Task.OrTimeout(); + await connection.DisposeAsync(); + Assert.Equal(2, receivedInvocationCount); + // if the events were running on the main loop they would deadlock + await closedTcs.Task.OrTimeout(); + } + } + [Fact] public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted() { diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/HubConnectionTests.cs index e8be5baf4c..f3b0dad029 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/HubConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/HubConnectionTests.cs @@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests [Fact] public async Task CanDisposeNotStartedHubConnection() { - await new HubConnection(new Uri("http://fakeuri.org"), Mock.Of(), Mock.Of()) + await new HubConnection(new Uri("http://fakeuri.org"), Mock.Of(), new LoggerFactory()) .DisposeAsync(); } @@ -129,12 +129,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of(), new LoggerFactory()); try { - var connectedEventRaised = false; - hubConnection.Connected += () => connectedEventRaised = true; + var connectedEventRaisedTcs = new TaskCompletionSource(); + hubConnection.Connected += () => connectedEventRaisedTcs.SetResult(null); await hubConnection.StartAsync(longPollingTransport, httpClient); - Assert.True(connectedEventRaised); + await connectedEventRaisedTcs.Task.OrTimeout(); } finally { diff --git a/test/Microsoft.AspNetCore.Sockets.Client.Tests/TaskQueueTests.cs b/test/Microsoft.AspNetCore.Sockets.Client.Tests/TaskQueueTests.cs index 7ae8dfce67..d74c9ca2cc 100644 --- a/test/Microsoft.AspNetCore.Sockets.Client.Tests/TaskQueueTests.cs +++ b/test/Microsoft.AspNetCore.Sockets.Client.Tests/TaskQueueTests.cs @@ -40,13 +40,13 @@ namespace Microsoft.AspNetCore.Client.Tests int n = 0; queue.Enqueue(() => { - n++; + n = 1; return Task.CompletedTask; }); Task task = queue.Enqueue(() => { - return Task.Delay(100).ContinueWith(t => n++); + return Task.Delay(100).ContinueWith(t => n = 2); }); task.Wait();