Dispatching raising events to separate threads

This commit is contained in:
moozzyk 2017-03-03 14:43:29 -08:00
parent a93839e1b2
commit 02df601715
4 changed files with 152 additions and 23 deletions

View File

@ -22,6 +22,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
private volatile ITransport _transport;
private volatile Task _receiveLoopTask;
private volatile Task _startTask = Task.CompletedTask;
private TaskQueue _eventQueue = new TaskQueue();
private ReadableChannel<Message> Input => _transportChannel.Input;
private WritableChannel<SendMessage> Output => _transportChannel.Output;
@ -85,26 +86,36 @@ namespace Microsoft.AspNetCore.Sockets.Client
if (Interlocked.CompareExchange(ref _connectionState, ConnectionState.Connected, ConnectionState.Connecting)
== ConnectionState.Connecting)
{
// Do not "simplify" - events can be removed from a different thread
var connectedEventHandler = Connected;
if (connectedEventHandler != null)
var ignore = _eventQueue.Enqueue(() =>
{
connectedEventHandler();
}
// Do not "simplify" - events can be removed from a different thread
var connectedEventHandler = Connected;
if (connectedEventHandler != null)
{
connectedEventHandler();
}
var ignore = Input.Completion.ContinueWith(t =>
return Task.CompletedTask;
});
ignore = Input.Completion.ContinueWith(async t =>
{
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
// Do not "simplify" - events can be removed from a different thread
await _eventQueue.Drain();
// Do not "simplify" - event handlers can be removed from a different thread
var closedEventHandler = Closed;
if (closedEventHandler != null)
{
closedEventHandler(t.IsFaulted ? t.Exception.InnerException : null);
}
return Task.CompletedTask;
});
// start receive loop
// start receive loop only after the Connected event was raised to
// avoid Received event being raised before the Connected event
_receiveLoopTask = ReceiveAsync();
}
}
@ -169,18 +180,36 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
try
{
_logger.LogTrace("Beginning receive loop");
_logger.LogTrace("Beginning receive loop.");
while (await Input.WaitToReadAsync())
{
if (_connectionState != ConnectionState.Connected)
{
_logger.LogDebug("Message received but connection is not connected. Skipping raising Received event.");
// drain
Input.TryRead(out Message ignore);
continue;
}
if (Input.TryRead(out Message message))
{
// Do not "simplify" - events can be removed from a different thread
var receivedEventHandler = Received;
if (receivedEventHandler != null)
_logger.LogDebug("Scheduling raising Received event.");
var ignore = _eventQueue.Enqueue(() =>
{
receivedEventHandler(message.Payload, message.Type);
}
// Do not "simplify" - event handlers can be removed from a different thread
var receivedEventHandler = Received;
if (receivedEventHandler != null)
{
receivedEventHandler(message.Payload, message.Type);
}
return Task.CompletedTask;
});
}
else
{
_logger.LogDebug("Could not read message.");
}
}
@ -232,6 +261,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
public async Task DisposeAsync()
{
_logger.LogInformation("Stopping client.");
Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected);
try
{

View File

@ -201,12 +201,12 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
var connection = new Connection(new Uri("http://fakeuri.org/"));
try
{
var connectedEventRaised = false;
connection.Connected += () => connectedEventRaised = true;
var connectedEventRaisedTcs = new TaskCompletionSource<object>();
connection.Connected += () => connectedEventRaisedTcs.SetResult(null);
await connection.StartAsync(longPollingTransport, httpClient);
Assert.True(connectedEventRaised);
await connectedEventRaisedTcs.Task.OrTimeout();
}
finally
{
@ -315,6 +315,104 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
}
}
[Fact]
public async Task ReceivedEventNotRaisedAfterConnectionIsDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
var mockTransport = new Mock<ITransport>();
IChannelConnection<SendMessage, Message> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<SendMessage, Message>>()))
.Returns<Uri, IChannelConnection<SendMessage, Message>>((url, c) =>
{
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
// The connection is now in the Disconnected state so the Received event for
// this message should not be raised
channel.Output.TryWrite(new Message());
channel.Output.TryComplete();
return Task.CompletedTask;
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var connection = new Connection(new Uri("http://fakeuri.org/"));
var receivedInvoked = false;
connection.Received += (m, t) => receivedInvoked = true;
await connection.StartAsync(mockTransport.Object, httpClient);
await connection.DisposeAsync();
Assert.False(receivedInvoked);
}
}
[Fact]
public async Task EventsAreNotRunningOnMainLoop()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
{
await Task.Yield();
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(string.Empty) };
});
var mockTransport = new Mock<ITransport>();
IChannelConnection<SendMessage, Message> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<IChannelConnection<SendMessage, Message>>()))
.Returns<Uri, IChannelConnection<SendMessage, Message>>((url, c) =>
{
channel = c;
return Task.CompletedTask;
});
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Output.TryComplete();
return Task.CompletedTask;
});
using (var httpClient = new HttpClient(mockHttpHandler.Object))
{
var closedTcs = new TaskCompletionSource<object>();
var allowDisposeTcs = new TaskCompletionSource<object>();
int receivedInvocationCount = 0;
var connection = new Connection(new Uri("http://fakeuri.org/"));
connection.Received +=
async (m, t) =>
{
if (Interlocked.Increment(ref receivedInvocationCount) == 2)
{
allowDisposeTcs.TrySetResult(null);
}
await closedTcs.Task;
};
connection.Closed += e => closedTcs.SetResult(null);
await connection.StartAsync(mockTransport.Object, httpClient);
channel.Output.TryWrite(new Message());
channel.Output.TryWrite(new Message());
await allowDisposeTcs.Task.OrTimeout();
await connection.DisposeAsync();
Assert.Equal(2, receivedInvocationCount);
// if the events were running on the main loop they would deadlock
await closedTcs.Task.OrTimeout();
}
}
[Fact]
public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted()
{

View File

@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[Fact]
public async Task CanDisposeNotStartedHubConnection()
{
await new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), Mock.Of<ILoggerFactory>())
await new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory())
.DisposeAsync();
}
@ -129,12 +129,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var hubConnection = new HubConnection(new Uri("http://fakeuri.org"), Mock.Of<IInvocationAdapter>(), new LoggerFactory());
try
{
var connectedEventRaised = false;
hubConnection.Connected += () => connectedEventRaised = true;
var connectedEventRaisedTcs = new TaskCompletionSource<object>();
hubConnection.Connected += () => connectedEventRaisedTcs.SetResult(null);
await hubConnection.StartAsync(longPollingTransport, httpClient);
Assert.True(connectedEventRaised);
await connectedEventRaisedTcs.Task.OrTimeout();
}
finally
{

View File

@ -40,13 +40,13 @@ namespace Microsoft.AspNetCore.Client.Tests
int n = 0;
queue.Enqueue(() =>
{
n++;
n = 1;
return Task.CompletedTask;
});
Task task = queue.Enqueue(() =>
{
return Task.Delay(100).ContinueWith(t => n++);
return Task.Delay(100).ContinueWith(t => n = 2);
});
task.Wait();