Fix channel stream cancellation

This commit is contained in:
Brennan 2019-04-23 10:08:12 -07:00
parent 5df3033882
commit 3f846d93b8
2 changed files with 8 additions and 5 deletions

View File

@ -23,9 +23,12 @@ namespace Microsoft.AspNetCore.SignalR.Internal
return new CancelableTypedAsyncEnumerable<T>(asyncEnumerable, cts);
}
public static IAsyncEnumerable<object> MakeCancelableAsyncEnumerableFromChannel<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default)
public static async IAsyncEnumerable<object> MakeAsyncEnumerableFromChannel<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default)
{
return MakeCancelableAsyncEnumerable(channel.ReadAllAsync(), cancellationToken);
await foreach (var item in channel.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
private class CancelableTypedAsyncEnumerable<TResult> : IAsyncEnumerable<TResult>

View File

@ -19,9 +19,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable)) && m.IsGenericMethod);
private static readonly MethodInfo MakeCancelableAsyncEnumerableFromChannelMethod = typeof(AsyncEnumerableAdapters)
private static readonly MethodInfo MakeAsyncEnumerableFromChannelMethod = typeof(AsyncEnumerableAdapters)
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(AsyncEnumerableAdapters.MakeCancelableAsyncEnumerableFromChannel)) && m.IsGenericMethod);
.Single(m => m.Name.Equals(nameof(AsyncEnumerableAdapters.MakeAsyncEnumerableFromChannel)) && m.IsGenericMethod);
private readonly MethodInfo _makeCancelableEnumerableMethodInfo;
private Func<object, CancellationToken, IAsyncEnumerable<object>> _makeCancelableEnumerable;
@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
if (openReturnType == typeof(ChannelReader<>))
{
StreamReturnType = returnType.GetGenericArguments()[0];
_makeCancelableEnumerableMethodInfo = MakeCancelableAsyncEnumerableFromChannelMethod;
_makeCancelableEnumerableMethodInfo = MakeAsyncEnumerableFromChannelMethod;
break;
}
}