diff --git a/samples/SignalRSamples/ObservableExtensions.cs b/samples/SignalRSamples/ObservableExtensions.cs index 04f5a5b549..8cd849b92b 100644 --- a/samples/SignalRSamples/ObservableExtensions.cs +++ b/samples/SignalRSamples/ObservableExtensions.cs @@ -9,18 +9,18 @@ namespace SignalRSamples { public static class ObservableExtensions { - public static ChannelReader AsChannelReader(this IObservable observable) + public static ChannelReader AsChannelReader(this IObservable observable, int? maxBufferSize = null) { // This sample shows adapting an observable to a ChannelReader without // back pressure, if the connection is slower than the producer, memory will // start to increase. - // If the channel is unbounded, TryWrite will return false and effectively + // If the channel is bounded, TryWrite will return false and effectively // drop items. // The other alternative is to use a bounded channel, and when the limit is reached - // block on WaitToWriteAsync. This will block a thread pool thread and isn't recommended - var channel = Channel.CreateUnbounded(); + // block on WaitToWriteAsync. This will block a thread pool thread and isn't recommended and isn't shown here. + var channel = maxBufferSize != null ? Channel.CreateBounded(maxBufferSize.Value) : Channel.CreateUnbounded(); var disposable = observable.Subscribe( value => channel.Writer.TryWrite(value),