Flow a cancellation token in to AsChannelReader (#2491)

This commit is contained in:
Kai Ruhnau 2018-06-15 01:33:59 +02:00 committed by Andrew Stanton-Nurse
parent 422e3c98f8
commit e4e9bd1a3c
2 changed files with 13 additions and 3 deletions

View File

@ -17,7 +17,7 @@ namespace SignalRSamples.Hubs
.Select((_, index) => index)
.Take(count);
return observable.AsChannelReader();
return observable.AsChannelReader(Context.ConnectionAborted);
}
public ChannelReader<int> ChannelCounter(int count, int delay)

View File

@ -3,13 +3,18 @@
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Channels;
namespace SignalRSamples
{
public static class ObservableExtensions
{
public static ChannelReader<T> AsChannelReader<T>(this IObservable<T> observable, int? maxBufferSize = null)
public static ChannelReader<T> AsChannelReader<T>(
this IObservable<T> observable,
CancellationToken connectionAborted,
int? maxBufferSize = null
)
{
// This sample shows adapting an observable to a ChannelReader without
// back pressure, if the connection is slower than the producer, memory will
@ -26,9 +31,14 @@ namespace SignalRSamples
value => channel.Writer.TryWrite(value),
error => channel.Writer.TryComplete(error),
() => channel.Writer.TryComplete());
var abortRegistration = connectionAborted.Register(() => channel.Writer.TryComplete());
// Complete the subscription on the reader completing
channel.Reader.Completion.ContinueWith(task => disposable.Dispose());
channel.Reader.Completion.ContinueWith(task =>
{
disposable.Dispose();
abortRegistration.Dispose();
});
return channel.Reader;
}