Removing native support for IObservable<T> (#1890)

- There are too many issues and questions with respect to back pressure and the buffering policy we should use when the client being streamed to can't support the data being pushed via OnNext.
As a result, we're dropping support for IObservable but keeping ChannelReader and we'll eventually support IAsyncEnumerable when that makes it into the BCL.
- Add sample showing Observable -> ChannelReader adaption
This commit is contained in:
David Fowler 2018-04-07 15:10:39 -07:00 committed by GitHub
parent 05ebd10258
commit 86083c0302
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 81 additions and 577 deletions

View File

@ -93,8 +93,6 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
public class TestHub : Hub
{
private static readonly IObservable<int> ObservableInstance = Observable.Empty<int>();
public void Invocation()
{
}
@ -119,21 +117,6 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
return new ValueTask<int>(1);
}
public IObservable<int> StreamObservable()
{
return ObservableInstance;
}
public Task<IObservable<int>> StreamObservableAsync()
{
return Task.FromResult(ObservableInstance);
}
public ValueTask<IObservable<int>> StreamObservableValueTaskAsync()
{
return new ValueTask<IObservable<int>>(ObservableInstance);
}
public ChannelReader<int> StreamChannelReader()
{
var channel = Channel.CreateUnbounded<int>();
@ -173,11 +156,6 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
return channel.Reader;
}
public IObservable<int> StreamObservableCount(int count)
{
return Observable.Range(0, count);
}
}
[Benchmark]
@ -210,24 +188,6 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationValueTaskAsync", null));
}
[Benchmark]
public Task StreamObservable()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamObservable", null));
}
[Benchmark]
public Task StreamObservableAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamObservableAsync", null));
}
[Benchmark]
public Task StreamObservableValueTaskAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamObservableValueTaskAsync", null));
}
[Benchmark]
public Task StreamChannelReader()
{
@ -263,23 +223,5 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", argumentBindingException: null, new object[] { 1000 }));
}
[Benchmark]
public Task StreamObservableCount_Zero()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamObservableCount", argumentBindingException: null, new object[] { 0 }));
}
[Benchmark]
public Task StreamObservableCount_One()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamObservableCount", argumentBindingException: null, new object[] { 1 }));
}
[Benchmark]
public Task StreamObservableCount_Thousand()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamObservableCount", argumentBindingException: null, new object[] { 1000 }));
}
}
}

View File

@ -3,6 +3,7 @@
using System;
using System.Reactive.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR;
@ -38,17 +39,24 @@ namespace FunctionalTests
return Clients.Client(Context.ConnectionId).SendAsync("CustomObject", customObject);
}
public IObservable<string> Stream()
public ChannelReader<string> Stream()
{
return new string[] { "a", "b", "c" }.ToObservable();
var channel = Channel.CreateUnbounded<string>();
channel.Writer.TryWrite("a");
channel.Writer.TryWrite("b");
channel.Writer.TryWrite("c");
channel.Writer.Complete();
return channel.Reader;
}
public IObservable<int> EmptyStream()
public ChannelReader<int> EmptyStream()
{
return Array.Empty<int>().ToObservable();
var channel = Channel.CreateUnbounded<int>();
channel.Writer.Complete();
return channel.Reader;
}
public IObservable<string> StreamThrowException(string message)
public ChannelReader<string> StreamThrowException(string message)
{
throw new InvalidOperationException(message);
}

View File

@ -1,8 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000;
export function getParameterByName(name: string) {
const url = window.location.href;
name = name.replace(/[\[\]]/g, "\\$&");

View File

@ -11,11 +11,13 @@ namespace SignalRSamples.Hubs
{
public class Streaming : Hub
{
public IObservable<int> ObservableCounter(int count, int delay)
public ChannelReader<int> ObservableCounter(int count, int delay)
{
return Observable.Interval(TimeSpan.FromMilliseconds(delay))
var observable = Observable.Interval(TimeSpan.FromMilliseconds(delay))
.Select((_, index) => index)
.Take(count);
return observable.AsChannelReader();
}
public ChannelReader<int> ChannelCounter(int count, int delay)

View File

@ -0,0 +1,36 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Reactive.Linq;
using System.Threading.Channels;
namespace SignalRSamples
{
public static class ObservableExtensions
{
public static ChannelReader<T> AsChannelReader<T>(this IObservable<T> observable)
{
// This sample shows adapting an observable to a ChannelReader without
// back pressure, if the connection is slower than the producer, memory will
// start to increase.
// If the channel is unbounded, TryWrite will return false and effectively
// drop items.
// The other alternative is to use a bounded channel, and when the limit is reached
// block on WaitToWriteAsync. This will block a thread pool thread and isn't recommended
var channel = Channel.CreateUnbounded<T>();
var disposable = observable.Subscribe(
value => channel.Writer.TryWrite(value),
error => channel.Writer.TryComplete(error),
() => channel.Writer.TryComplete());
// Complete the subscription on the reader completing
channel.Reader.Completion.ContinueWith(task => disposable.Dispose());
return channel.Reader;
}
}
}

View File

@ -1,54 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
namespace Microsoft.AspNetCore.SignalR.Client
{
internal class CastObservable<TResult> : IObservable<TResult>
{
private IObservable<object> _innerObservable;
public CastObservable(IObservable<object> innerObservable)
{
_innerObservable = innerObservable;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
return _innerObservable.Subscribe(new CastObserver(observer));
}
private class CastObserver : IObserver<object>
{
private IObserver<TResult> _innerObserver;
public CastObserver(IObserver<TResult> innerObserver)
{
_innerObserver = innerObserver;
}
public void OnCompleted()
{
_innerObserver.OnCompleted();
}
public void OnError(Exception error)
{
_innerObserver.OnError(error);
}
public void OnNext(object value)
{
try
{
_innerObserver.OnNext((TResult)value);
}
catch(Exception ex)
{
_innerObserver.OnError(ex);
}
}
}
}
}

View File

@ -11,71 +11,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
// True-internal because this is a weird and tricky class to use :)
internal static class AsyncEnumeratorAdapters
{
public static IAsyncEnumerator<object> FromObservable<T>(IObservable<T> observable, CancellationToken cancellationToken)
{
// TODO: Allow bounding and optimizations?
var channel = Channel.CreateUnbounded<object>();
var observer = new ObserverState();
var channelObserver = new ChannelObserver<T>(channel.Writer);
observer.Subscription = observable.Subscribe(channelObserver);
observer.TokenRegistration = cancellationToken.Register(obs => ((ChannelObserver<T>)obs).OnCompleted(), channelObserver);
// Make sure the subscription and token registration is disposed when enumeration is completed.
return new AsyncEnumerator<object>(channel.Reader, cancellationToken, observer);
}
// To track and dispose of the Subscription and the cancellation token registration.
private class ObserverState : IDisposable
{
public CancellationTokenRegistration TokenRegistration;
public IDisposable Subscription;
public void Dispose()
{
TokenRegistration.Dispose();
Subscription.Dispose();
}
}
private class ChannelObserver<T> : IObserver<T>
{
private readonly ChannelWriter<object> _output;
public ChannelObserver(ChannelWriter<object> output)
{
_output = output;
}
public void OnCompleted()
{
_output.TryComplete();
}
public void OnError(Exception error)
{
_output.TryComplete(error);
}
public void OnNext(T value)
{
// This will block the thread emitting the object if the channel is bounded and full
// I think this is OK, since we want to push the backpressure up. However, we may need
// to find a way to force the entire subscription off to a dedicated thread in order to
// ensure we don't block other tasks
// Right now however, we use unbounded channels, so all of the above is moot because TryWrite will always succeed
while (!_output.TryWrite(value))
{
// Wait for a spot
if (!_output.WaitToWriteAsync().Result)
{
// Channel was closed so we just no-op. The observer shouldn't throw.
return;
}
}
}
}
public static IAsyncEnumerator<object> GetAsyncEnumerator<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default(CancellationToken))
{
// Nothing to dispose when we finish enumerating in this case.

View File

@ -198,7 +198,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
Log.InvalidReturnValueFromStreamingMethod(_logger, methodExecutor.MethodInfo.Name);
await SendInvocationError(hubMethodInvocationMessage, connection,
$"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is null, does not implement the IObservable<> interface or is not a ReadableChannel<>.");
$"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is not a ChannelReader<>.");
return;
}
@ -364,13 +364,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
{
if (result != null)
{
if (hubMethodDescriptor.IsObservable)
{
streamCts = CreateCancellation();
enumerator = hubMethodDescriptor.FromObservable(result, streamCts.Token);
return true;
}
if (hubMethodDescriptor.IsChannel)
{
streamCts = CreateCancellation();

View File

@ -15,10 +15,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
{
internal class HubMethodDescriptor
{
private static readonly MethodInfo FromObservableMethod = typeof(AsyncEnumeratorAdapters)
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(AsyncEnumeratorAdapters.FromObservable)) && m.IsGenericMethod);
private static readonly MethodInfo GetAsyncEnumeratorMethod = typeof(AsyncEnumeratorAdapters)
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(AsyncEnumeratorAdapters.GetAsyncEnumerator)) && m.IsGenericMethod);
@ -33,12 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
? MethodExecutor.AsyncResultType
: MethodExecutor.MethodReturnType;
if (IsObservableType(NonAsyncReturnType, out var observableItemType))
{
IsObservable = true;
StreamReturnType = observableItemType;
}
else if (IsChannelType(NonAsyncReturnType, out var channelItemType))
if (IsChannelType(NonAsyncReturnType, out var channelItemType))
{
IsChannel = true;
StreamReturnType = channelItemType;
@ -53,11 +44,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
public Type NonAsyncReturnType { get; }
public bool IsObservable { get; }
public bool IsChannel { get; }
public bool IsStreamable => IsObservable || IsChannel;
public bool IsStreamable => IsChannel;
public Type StreamReturnType { get; }
@ -76,35 +65,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
return true;
}
private static bool IsObservableType(Type type, out Type payloadType)
{
var observableInterface = IsIObservable(type) ? type : type.GetInterfaces().FirstOrDefault(IsIObservable);
if (observableInterface == null)
{
payloadType = null;
return false;
}
payloadType = observableInterface.GetGenericArguments()[0];
return true;
bool IsIObservable(Type iface)
{
return iface.IsGenericType && iface.GetGenericTypeDefinition() == typeof(IObservable<>);
}
}
public IAsyncEnumerator<object> FromObservable(object observable, CancellationToken cancellationToken)
{
// there is the potential for compile to be called times but this has no harmful effect other than perf
if (_convertToEnumerator == null)
{
_convertToEnumerator = CompileConvertToEnumerator(FromObservableMethod, StreamReturnType);
}
return _convertToEnumerator.Invoke(observable, cancellationToken);
}
public IAsyncEnumerator<object> FromChannel(object channel, CancellationToken cancellationToken)
{
// there is the potential for compile to be called times but this has no harmful effect other than perf
@ -120,10 +80,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal
{
// This will call one of two adapter methods to wrap the passed in streamable value
// and cancellation token to an IAsyncEnumerator<object>
//
// IObservable<T>:
// AsyncEnumeratorAdapters.FromObservable<T>(observable, cancellationToken);
//
// ChannelReader<T>
// AsyncEnumeratorAdapters.GetAsyncEnumerator<T>(channelReader, cancellationToken);

View File

@ -681,7 +681,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsChannelAsync<int>("StreamBroken").OrTimeout();
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync()).OrTimeout();
Assert.Equal("The value returned by the streaming method 'StreamBroken' is null, does not implement the IObservable<> interface or is not a ReadableChannel<>.", ex.Message);
Assert.Equal("The value returned by the streaming method 'StreamBroken' is not a ChannelReader<>.", ex.Message);
}
catch (Exception ex)
{

View File

@ -20,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public string Echo(string message) => TestHubMethodsImpl.Echo(message);
public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ChannelReader<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
@ -87,7 +87,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public string Echo(string message) => TestHubMethodsImpl.Echo(message);
public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ChannelReader<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public string Echo(string message) => TestHubMethodsImpl.Echo(message);
public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ChannelReader<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
@ -139,11 +139,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
return message;
}
public static IObservable<int> Stream(int count)
public static ChannelReader<int> Stream(int count)
{
return Observable.Interval(TimeSpan.FromMilliseconds(1))
.Select((_, index) => index)
.Take(count);
var channel = Channel.CreateUnbounded<int>();
Task.Run(async () =>
{
for (var i = 0; i < count; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(100);
}
channel.Writer.TryComplete();
});
return channel.Reader;
}
public static ChannelReader<int> StreamException()

View File

@ -85,212 +85,6 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task ObservableHubRemovesSubscriptionsWithInfiniteStreams()
{
var observable = new Observable<int>();
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(s => s.AddSingleton(observable));
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<ObservableHub>>();
var waitForSubscribe = new TaskCompletionSource<object>();
observable.OnSubscribe = o =>
{
waitForSubscribe.TrySetResult(null);
};
var waitForDispose = new TaskCompletionSource<object>();
observable.OnDispose = o =>
{
waitForDispose.TrySetResult(null);
};
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
async Task Produce()
{
var i = 0;
while (true)
{
observable.OnNext(i++);
await Task.Delay(100);
}
}
_ = Produce();
Assert.Empty(observable.Observers);
var subscribeTask = client.StreamAsync(nameof(ObservableHub.Subscribe));
await waitForSubscribe.Task.OrTimeout();
Assert.Single(observable.Observers);
client.Dispose();
// We don't care if this throws, we just expect it to complete
try
{
await subscribeTask.OrTimeout();
}
catch
{
}
await waitForDispose.Task.OrTimeout();
Assert.Empty(observable.Observers);
await connectionHandlerTask.OrTimeout();
}
}
[Fact]
public async Task OberserverDoesntThrowWhenOnNextIsCalledAfterChannelIsCompleted()
{
var observable = new Observable<int>();
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(s => s.AddSingleton(observable));
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<ObservableHub>>();
var waitForSubscribe = new TaskCompletionSource<object>();
observable.OnSubscribe = o =>
{
waitForSubscribe.TrySetResult(null);
};
var waitForDispose = new TaskCompletionSource<object>();
observable.OnDispose = o =>
{
waitForDispose.TrySetResult(null);
};
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
var subscribeTask = client.StreamAsync(nameof(ObservableHub.Subscribe));
await waitForSubscribe.Task.OrTimeout();
Assert.Single(observable.Observers);
// Disposing the client to complete the observer. Further calls to OnNext should no-op
client.Dispose();
// Calling OnNext after the client has disconnected shouldn't throw.
observable.OnNext(1);
await waitForDispose.Task.OrTimeout();
Assert.Empty(observable.Observers);
await connectionHandlerTask.OrTimeout();
}
}
[Fact]
public async Task ObservableHubRemovesSubscriptions()
{
var observable = new Observable<int>();
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(s => s.AddSingleton(observable));
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<ObservableHub>>();
var waitForSubscribe = new TaskCompletionSource<object>();
observable.OnSubscribe = o =>
{
waitForSubscribe.TrySetResult(null);
};
var waitForDispose = new TaskCompletionSource<object>();
observable.OnDispose = o =>
{
waitForDispose.TrySetResult(null);
};
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
async Task Subscribe()
{
var results = await client.StreamAsync(nameof(ObservableHub.Subscribe));
var items = results.OfType<StreamItemMessage>().ToList();
Assert.Single(items);
Assert.Equal(2, (long)items[0].Item);
}
observable.OnNext(1);
Assert.Empty(observable.Observers);
var subscribeTask = Subscribe();
await waitForSubscribe.Task.OrTimeout();
Assert.Single(observable.Observers);
observable.OnNext(2);
observable.Complete();
await subscribeTask.OrTimeout();
client.Dispose();
await waitForDispose.Task.OrTimeout();
Assert.Empty(observable.Observers);
await connectionHandlerTask.OrTimeout();
}
}
[Fact]
public async Task ObservableHubRemovesSubscriptionWhenCanceledFromClient()
{
var observable = new Observable<int>();
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(s => s.AddSingleton(observable));
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<ObservableHub>>();
var waitForSubscribe = new TaskCompletionSource<object>();
observable.OnSubscribe = o =>
{
waitForSubscribe.TrySetResult(null);
};
var waitForDispose = new TaskCompletionSource<object>();
observable.OnDispose = o =>
{
waitForDispose.TrySetResult(null);
};
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
var invocationId = await client.SendStreamInvocationAsync(nameof(ObservableHub.Subscribe)).OrTimeout();
await waitForSubscribe.Task.OrTimeout();
await client.SendHubMessageAsync(new CancelInvocationMessage(invocationId)).OrTimeout();
await waitForDispose.Task.OrTimeout();
var message = await client.ReadAsync().OrTimeout();
Assert.IsType<CompletionMessage>(message);
client.Dispose();
await connectionHandlerTask.OrTimeout();
}
}
[Fact]
public async Task MissingHandshakeAndMessageSentFromHubConnectionCanBeDisposedCleanly()
{
@ -1677,7 +1471,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
Assert.NotNull(completion);
if (detailedErrors)
{
Assert.Equal("An error occurred on the server while streaming results. Exception: Exception from observable", completion.Error);
Assert.Equal("An error occurred on the server while streaming results. Exception: Exception from channel", completion.Error);
}
else
{
@ -1733,8 +1527,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
foreach (var method in new[]
{
nameof(StreamingHub.CounterChannel), nameof(StreamingHub.CounterChannelAsync), nameof(StreamingHub.CounterChannelValueTaskAsync),
nameof(StreamingHub.CounterObservable), nameof(StreamingHub.CounterObservableAsync), nameof(StreamingHub.CounterObservableValueTaskAsync)
nameof(StreamingHub.CounterChannel), nameof(StreamingHub.CounterChannelAsync), nameof(StreamingHub.CounterChannelValueTaskAsync)
})
{
foreach (var protocolName in HubProtocolHelpers.AllProtocolNames)

View File

@ -421,18 +421,6 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public class ObservableHub : Hub
{
private readonly Observable<int> _numbers;
public ObservableHub(Observable<int> numbers)
{
_numbers = numbers;
}
public IObservable<int> Subscribe() => _numbers;
}
public class AbortHub : Hub
{
public void Kill()
@ -441,89 +429,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
public class Observable<T> : IObservable<T>
{
public List<IObserver<T>> Observers = new List<IObserver<T>>();
public Action<IObserver<T>> OnSubscribe;
public Action<IObserver<T>> OnDispose;
public IDisposable Subscribe(IObserver<T> observer)
{
lock (Observers)
{
Observers.Add(observer);
}
OnSubscribe?.Invoke(observer);
return new DisposableAction(() =>
{
lock (Observers)
{
Observers.Remove(observer);
}
OnDispose?.Invoke(observer);
});
}
public void OnNext(T value)
{
lock (Observers)
{
foreach (var observer in Observers)
{
observer.OnNext(value);
}
}
}
public void Complete()
{
lock (Observers)
{
foreach (var observer in Observers)
{
observer.OnCompleted();
}
}
}
private class DisposableAction : IDisposable
{
private readonly Action _action;
public DisposableAction(Action action)
{
_action = action;
}
public void Dispose()
{
_action();
}
}
}
public class StreamingHub : TestHub
{
public IObservable<string> CounterObservable(int count)
{
return new CountingObservable(count);
}
public async Task<IObservable<string>> CounterObservableAsync(int count)
{
await Task.Yield();
return CounterObservable(count);
}
public async ValueTask<IObservable<string>> CounterObservableValueTaskAsync(int count)
{
await Task.Yield();
return CounterObservable(count);
}
public ChannelReader<string> CounterChannel(int count)
{
@ -558,34 +465,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
return Channel.CreateUnbounded<string>().Reader;
}
public IObservable<int> ThrowStream()
public ChannelReader<int> ThrowStream()
{
return Observable.Throw<int>(new Exception("Exception from observable"));
}
private class CountingObservable : IObservable<string>
{
private int _count;
public CountingObservable(int count)
{
_count = count;
}
public IDisposable Subscribe(IObserver<string> observer)
{
var cts = new CancellationTokenSource();
Task.Run(() =>
{
for (int i = 0; !cts.Token.IsCancellationRequested && i < _count; i++)
{
observer.OnNext(i.ToString());
}
observer.OnCompleted();
});
return new CancellationDisposable(cts);
}
var channel = Channel.CreateUnbounded<int>();
channel.Writer.TryComplete(new Exception("Exception from channel"));
return channel.Reader;
}
}