diff --git a/samples/SocketsSample/Hubs/Streaming.cs b/samples/SocketsSample/Hubs/Streaming.cs index e8f26ef0df..63fa8b71e1 100644 --- a/samples/SocketsSample/Hubs/Streaming.cs +++ b/samples/SocketsSample/Hubs/Streaming.cs @@ -1,6 +1,5 @@ using System; -using System.IO; -using System.Threading; +using System.Reactive.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.SignalR; @@ -11,7 +10,9 @@ namespace SocketsSample.Hubs { public IObservable ObservableCounter(int count, int delay) { - return new CounterObservable(count, delay); + return Observable.Interval(TimeSpan.FromMilliseconds(delay)) + .Select((_, index) => index) + .Take(count); } public ReadableChannel ChannelCounter(int count, int delay) @@ -31,49 +32,5 @@ namespace SocketsSample.Hubs return channel.In; } - - private class CounterObservable : IObservable - { - private int _count; - private int _delay; - - public CounterObservable(int count, int delay) - { - _count = count; - _delay = delay; - } - - public IDisposable Subscribe(IObserver observer) - { - // Run in a thread-pool thread - var cts = new CancellationTokenSource(); - Task.Run(async () => - { - for (var i = 0; !cts.Token.IsCancellationRequested && i < _count; i++) - { - observer.OnNext(i); - await Task.Delay(_delay); - } - observer.OnCompleted(); - }); - - return new Disposable(() => cts.Cancel()); - } - } - - private class Disposable : IDisposable - { - private Action _action; - - public Disposable(Action action) - { - _action = action; - } - - public void Dispose() - { - _action(); - } - } } } diff --git a/samples/SocketsSample/SocketsSample.csproj b/samples/SocketsSample/SocketsSample.csproj index 63fa1532b9..6b672a047f 100644 --- a/samples/SocketsSample/SocketsSample.csproj +++ b/samples/SocketsSample/SocketsSample.csproj @@ -29,6 +29,7 @@ +