Don't throw from Observer's OnNext (#1872)

This commit is contained in:
Mikael Mengistu 2018-04-06 00:35:04 -07:00 committed by GitHub
parent 36edadabb4
commit 4ddf8664c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 15 deletions

View File

@ -15,22 +15,35 @@ namespace Microsoft.AspNetCore.SignalR.Internal
{
// TODO: Allow bounding and optimizations?
var channel = Channel.CreateUnbounded<object>();
var observer = new ObserverState();
var channelObserver = new ChannelObserver<T>(channel.Writer);
observer.Subscription = observable.Subscribe(channelObserver);
observer.TokenRegistration = cancellationToken.Register(obs => ((ChannelObserver<T>)obs).OnCompleted(), channelObserver);
var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Writer, cancellationToken));
// Make sure the subscription and token registration is disposed when enumeration is completed.
return new AsyncEnumerator<object>(channel.Reader, cancellationToken, observer);
}
// Make sure the subscription is disposed when enumeration is completed.
return new AsyncEnumerator<object>(channel.Reader, cancellationToken, subscription);
// To track and dispose of the Subscription and the cancellation token registration.
private class ObserverState : IDisposable
{
public CancellationTokenRegistration TokenRegistration;
public IDisposable Subscription;
public void Dispose()
{
TokenRegistration.Dispose();
Subscription.Dispose();
}
}
private class ChannelObserver<T> : IObserver<T>
{
private readonly ChannelWriter<object> _output;
private readonly CancellationToken _cancellationToken;
public ChannelObserver(ChannelWriter<object> output, CancellationToken cancellationToken)
public ChannelObserver(ChannelWriter<object> output)
{
_output = output;
_cancellationToken = cancellationToken;
}
public void OnCompleted()
@ -45,12 +58,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
public void OnNext(T value)
{
if (_cancellationToken.IsCancellationRequested)
{
// Noop, someone else is handling the cancellation
return;
}
// This will block the thread emitting the object if the channel is bounded and full
// I think this is OK, since we want to push the backpressure up. However, we may need
// to find a way to force the entire subscription off to a dedicated thread in order to
@ -60,10 +67,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
while (!_output.TryWrite(value))
{
// Wait for a spot
if (!_output.WaitToWriteAsync(_cancellationToken).Result)
if (!_output.WaitToWriteAsync().Result)
{
// Channel was closed.
throw new InvalidOperationException("Output channel was closed");
// Channel was closed so we just no-op. The observer shouldn't throw.
return;
}
}
}

View File

@ -148,6 +148,49 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task OberserverDoesntThrowWhenOnNextIsCalledAfterChannelIsCompleted()
{
var observable = new Observable<int>();
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(s => s.AddSingleton(observable));
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<ObservableHub>>();
var waitForSubscribe = new TaskCompletionSource<object>();
observable.OnSubscribe = o =>
{
waitForSubscribe.TrySetResult(null);
};
var waitForDispose = new TaskCompletionSource<object>();
observable.OnDispose = o =>
{
waitForDispose.TrySetResult(null);
};
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
var subscribeTask = client.StreamAsync(nameof(ObservableHub.Subscribe));
await waitForSubscribe.Task.OrTimeout();
Assert.Single(observable.Observers);
// Disposing the client to complete the observer. Further calls to OnNext should no-op
client.Dispose();
// Calling OnNext after the client has disconnected shouldn't throw.
observable.OnNext(1);
await waitForDispose.Task.OrTimeout();
Assert.Empty(observable.Observers);
await connectionHandlerTask.OrTimeout();
}
}
[Fact]
public async Task ObservableHubRemovesSubscriptions()
{