Add support for creating a bounded channel in helper
This commit is contained in:
parent
0d9c3d8898
commit
767f2900f9
|
|
@ -9,18 +9,18 @@ namespace SignalRSamples
|
|||
{
|
||||
public static class ObservableExtensions
|
||||
{
|
||||
public static ChannelReader<T> AsChannelReader<T>(this IObservable<T> observable)
|
||||
public static ChannelReader<T> AsChannelReader<T>(this IObservable<T> observable, int? maxBufferSize = null)
|
||||
{
|
||||
// This sample shows adapting an observable to a ChannelReader without
|
||||
// back pressure, if the connection is slower than the producer, memory will
|
||||
// start to increase.
|
||||
|
||||
// If the channel is unbounded, TryWrite will return false and effectively
|
||||
// If the channel is bounded, TryWrite will return false and effectively
|
||||
// drop items.
|
||||
|
||||
// The other alternative is to use a bounded channel, and when the limit is reached
|
||||
// block on WaitToWriteAsync. This will block a thread pool thread and isn't recommended
|
||||
var channel = Channel.CreateUnbounded<T>();
|
||||
// block on WaitToWriteAsync. This will block a thread pool thread and isn't recommended and isn't shown here.
|
||||
var channel = maxBufferSize != null ? Channel.CreateBounded<T>(maxBufferSize.Value) : Channel.CreateUnbounded<T>();
|
||||
|
||||
var disposable = observable.Subscribe(
|
||||
value => channel.Writer.TryWrite(value),
|
||||
|
|
|
|||
Loading…
Reference in New Issue