From 0adbfc6d25a5d12b0e89e0b1e04a4f953a3c6ba3 Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Thu, 9 May 2019 21:08:20 -0700 Subject: [PATCH] Revert "Remove CancelableEnumerator (#10099)" (#10129) --- .../common/Shared/AsyncEnumerableAdapters.cs | 31 +++++++++++++++++++ .../Core/src/Internal/DefaultHubDispatcher.cs | 5 +-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs index 93556e8d94..6ebd1dcb25 100644 --- a/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs +++ b/src/SignalR/common/Shared/AsyncEnumerableAdapters.cs @@ -51,10 +51,37 @@ namespace Microsoft.AspNetCore.SignalR.Internal { ((CancellationTokenSource)ctsState).Cancel(); }, _cts); + + return new CancelableEnumerator(_asyncEnumerable.GetAsyncEnumerator(), registration); } return enumerator; } + + private class CancelableEnumerator : IAsyncEnumerator + { + private IAsyncEnumerator _asyncEnumerator; + private readonly CancellationTokenRegistration _cancellationTokenRegistration; + + public T Current => (T)_asyncEnumerator.Current; + + public CancelableEnumerator(IAsyncEnumerator asyncEnumerator, CancellationTokenRegistration registration) + { + _asyncEnumerator = asyncEnumerator; + _cancellationTokenRegistration = registration; + } + + public ValueTask MoveNextAsync() + { + return _asyncEnumerator.MoveNextAsync(); + } + + public ValueTask DisposeAsync() + { + _cancellationTokenRegistration.Dispose(); + return _asyncEnumerator.DisposeAsync(); + } + } } /// Converts an IAsyncEnumerable of T to an IAsyncEnumerable of object. @@ -71,6 +98,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { + // Assume that this will be iterated through with await foreach which always passes a default token. + // Instead use the token from the ctor. + Debug.Assert(cancellationToken == default); + var enumeratorOfT = _asyncEnumerable.GetAsyncEnumerator(_cancellationToken); return enumeratorOfT as IAsyncEnumerator ?? new BoxedAsyncEnumerator(enumeratorOfT); } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 6d097d731d..ee39dcad9d 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -403,12 +403,13 @@ namespace Microsoft.AspNetCore.SignalR.Internal IHubActivator hubActivator, THub hub, CancellationTokenSource streamCts, HubMethodInvocationMessage hubMethodInvocationMessage) { string error = null; + try { - await foreach(var item in enumerable.WithCancellation(streamCts.Token)) + await foreach (var streamItem in enumerable) { // Send the stream item - await connection.WriteAsync(new StreamItemMessage(invocationId, item)); + await connection.WriteAsync(new StreamItemMessage(invocationId, streamItem)); } } catch (ChannelClosedException ex)