Use RX in streaming sample

This commit is contained in:
David Fowler 2017-06-03 11:45:36 -07:00
parent 42e2715a95
commit cad9f2f671
2 changed files with 5 additions and 47 deletions

View File

@ -1,6 +1,5 @@
using System;
using System.IO;
using System.Threading;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR;
@ -11,7 +10,9 @@ namespace SocketsSample.Hubs
{
public IObservable<int> ObservableCounter(int count, int delay)
{
return new CounterObservable(count, delay);
return Observable.Interval(TimeSpan.FromMilliseconds(delay))
.Select((_, index) => index)
.Take(count);
}
public ReadableChannel<int> ChannelCounter(int count, int delay)
@ -31,49 +32,5 @@ namespace SocketsSample.Hubs
return channel.In;
}
private class CounterObservable : IObservable<int>
{
private int _count;
private int _delay;
public CounterObservable(int count, int delay)
{
_count = count;
_delay = delay;
}
public IDisposable Subscribe(IObserver<int> observer)
{
// Run in a thread-pool thread
var cts = new CancellationTokenSource();
Task.Run(async () =>
{
for (var i = 0; !cts.Token.IsCancellationRequested && i < _count; i++)
{
observer.OnNext(i);
await Task.Delay(_delay);
}
observer.OnCompleted();
});
return new Disposable(() => cts.Cancel());
}
}
private class Disposable : IDisposable
{
private Action _action;
public Disposable(Action action)
{
_action = action;
}
public void Dispose()
{
_action();
}
}
}
}

View File

@ -29,6 +29,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufVersion)" />
<PackageReference Include="System.Reactive.Linq" Version="$(RxVersion)" />
</ItemGroup>
<Target Name="CopyTSClient" BeforeTargets="AfterBuild">