diff --git a/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs b/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs index adcb911639..2839a974f0 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs @@ -15,22 +15,35 @@ namespace Microsoft.AspNetCore.SignalR.Internal { // TODO: Allow bounding and optimizations? var channel = Channel.CreateUnbounded(); + var observer = new ObserverState(); + var channelObserver = new ChannelObserver(channel.Writer); + observer.Subscription = observable.Subscribe(channelObserver); + observer.TokenRegistration = cancellationToken.Register(obs => ((ChannelObserver)obs).OnCompleted(), channelObserver); - var subscription = observable.Subscribe(new ChannelObserver(channel.Writer, cancellationToken)); + // Make sure the subscription and token registration is disposed when enumeration is completed. + return new AsyncEnumerator(channel.Reader, cancellationToken, observer); + } - // Make sure the subscription is disposed when enumeration is completed. - return new AsyncEnumerator(channel.Reader, cancellationToken, subscription); + // To track and dispose of the Subscription and the cancellation token registration. + private class ObserverState : IDisposable + { + public CancellationTokenRegistration TokenRegistration; + public IDisposable Subscription; + + public void Dispose() + { + TokenRegistration.Dispose(); + Subscription.Dispose(); + } } private class ChannelObserver : IObserver { private readonly ChannelWriter _output; - private readonly CancellationToken _cancellationToken; - public ChannelObserver(ChannelWriter output, CancellationToken cancellationToken) + public ChannelObserver(ChannelWriter output) { _output = output; - _cancellationToken = cancellationToken; } public void OnCompleted() @@ -45,12 +58,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal public void OnNext(T value) { - if (_cancellationToken.IsCancellationRequested) - { - // Noop, someone else is handling the cancellation - return; - } - // This will block the thread emitting the object if the channel is bounded and full // I think this is OK, since we want to push the backpressure up. However, we may need // to find a way to force the entire subscription off to a dedicated thread in order to @@ -60,10 +67,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal while (!_output.TryWrite(value)) { // Wait for a spot - if (!_output.WaitToWriteAsync(_cancellationToken).Result) + if (!_output.WaitToWriteAsync().Result) { - // Channel was closed. - throw new InvalidOperationException("Output channel was closed"); + // Channel was closed so we just no-op. The observer shouldn't throw. + return; } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs index 4d758f4fc7..1bb27f1637 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs @@ -148,6 +148,49 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + [Fact] + public async Task OberserverDoesntThrowWhenOnNextIsCalledAfterChannelIsCompleted() + { + var observable = new Observable(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(s => s.AddSingleton(observable)); + var connectionHandler = serviceProvider.GetService>(); + + var waitForSubscribe = new TaskCompletionSource(); + observable.OnSubscribe = o => + { + waitForSubscribe.TrySetResult(null); + }; + + var waitForDispose = new TaskCompletionSource(); + observable.OnDispose = o => + { + waitForDispose.TrySetResult(null); + }; + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); + + var subscribeTask = client.StreamAsync(nameof(ObservableHub.Subscribe)); + + await waitForSubscribe.Task.OrTimeout(); + + Assert.Single(observable.Observers); + + // Disposing the client to complete the observer. Further calls to OnNext should no-op + client.Dispose(); + + // Calling OnNext after the client has disconnected shouldn't throw. + observable.OnNext(1); + + await waitForDispose.Task.OrTimeout(); + + Assert.Empty(observable.Observers); + + await connectionHandlerTask.OrTimeout(); + } + } + [Fact] public async Task ObservableHubRemovesSubscriptions() {