IAsyncEnumerable Hub Parameters (#9763)

This commit is contained in:
Mikael Mengistu 2019-04-30 16:51:05 -07:00 committed by GitHub
parent 0b7458cc98
commit f30d79e9ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 143 additions and 7 deletions

View File

@ -357,6 +357,52 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
}
}
[Theory]
[InlineData("json")]
[InlineData("messagepack")]
public async Task CanStreamToHubWithIAsyncEnumerableMethodArg(string protocolName)
{
var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server))
{
var connection = CreateHubConnection(server.Url, "/default", HttpTransportType.WebSockets, protocol, LoggerFactory);
try
{
async IAsyncEnumerable<int> ClientStreamData(int value)
{
for (var i = 0; i < value; i++)
{
yield return i;
await Task.Delay(10);
}
}
var streamTo = 5;
var stream = ClientStreamData(streamTo);
await connection.StartAsync().OrTimeout();
var expectedValue = 0;
var asyncEnumerable = connection.StreamAsync<int>("StreamIAsyncConsumer", stream);
await foreach (var streamValue in asyncEnumerable)
{
Assert.Equal(expectedValue, streamValue);
expectedValue++;
}
Assert.Equal(streamTo, expectedValue);
}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}
[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]

View File

@ -45,6 +45,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public ChannelReader<int> StreamEchoInt(ChannelReader<int> source) => TestHubMethodsImpl.StreamEchoInt(source);
public IAsyncEnumerable<int> StreamIAsyncConsumer(IAsyncEnumerable<int> source) => source;
public string GetUserIdentifier()
{
return Context.UserIdentifier;
@ -125,6 +127,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public ChannelReader<string> StreamEcho(ChannelReader<string> source) => TestHubMethodsImpl.StreamEcho(source);
public ChannelReader<int> StreamEchoInt(ChannelReader<int> source) => TestHubMethodsImpl.StreamEchoInt(source);
public IAsyncEnumerable<int> StreamIAsyncConsumer(IAsyncEnumerable<int> source) => source;
}
public class TestHubT : Hub<ITestHub>
@ -157,6 +161,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public ChannelReader<string> StreamEcho(ChannelReader<string> source) => TestHubMethodsImpl.StreamEcho(source);
public ChannelReader<int> StreamEchoInt(ChannelReader<int> source) => TestHubMethodsImpl.StreamEchoInt(source);
public IAsyncEnumerable<int> StreamIAsyncConsumer(IAsyncEnumerable<int> source) => source;
}
internal static class TestHubMethodsImpl

View File

@ -38,6 +38,11 @@ namespace Microsoft.AspNetCore.SignalR
#if NETCOREAPP3_0
public static bool IsIAsyncEnumerable(Type type)
{
if (type.IsGenericType)
{
return type.GetGenericTypeDefinition() == typeof(IAsyncEnumerable<>);
}
return type.GetInterfaces().Any(t =>
{
if (t.IsGenericType)

View File

@ -277,7 +277,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
{
Log.StartingParameterStream(_logger, hubMethodInvocationMessage.StreamIds[streamPointer]);
var itemType = descriptor.StreamingParameters[streamPointer];
arguments[parameterPointer] = connection.StreamTracker.AddStream(hubMethodInvocationMessage.StreamIds[streamPointer], itemType);
arguments[parameterPointer] = connection.StreamTracker.AddStream(hubMethodInvocationMessage.StreamIds[streamPointer],
itemType, descriptor.OriginalParameterTypes[parameterPointer]);
streamPointer++;
}
else

View File

@ -5,4 +5,4 @@ using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.SignalR.Tests.Utils, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.SignalR.Microbenchmarks, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.SignalR.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.SignalR.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]

View File

@ -21,11 +21,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <summary>
/// Creates a new stream and returns the ChannelReader for it as an object.
/// </summary>
public object AddStream(string streamId, Type itemType)
public object AddStream(string streamId, Type itemType, Type targetType)
{
var newConverter = (IStreamConverter)_buildConverterMethod.MakeGenericMethod(itemType).Invoke(null, Array.Empty<object>());
_lookup[streamId] = newConverter;
return newConverter.GetReaderAsObject();
return newConverter.GetReaderAsObject(targetType);
}
private bool TryGetConverter(string streamId, out IStreamConverter converter)
@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.SignalR
private interface IStreamConverter
{
Type GetItemType();
object GetReaderAsObject();
object GetReaderAsObject(Type type);
Task WriteToStream(object item);
void TryComplete(Exception ex);
}
@ -100,9 +100,16 @@ namespace Microsoft.AspNetCore.SignalR
return typeof(T);
}
public object GetReaderAsObject()
public object GetReaderAsObject(Type type)
{
return _channel.Reader;
if (ReflectionHelper.IsIAsyncEnumerable(type))
{
return _channel.Reader.ReadAllAsync();
}
else
{
return _channel.Reader;
}
}
public Task WriteToStream(object o)

View File

@ -0,0 +1,70 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Tests.Internal
{
public class ReflectionHelperTests
{
[Theory]
[MemberData(nameof(TypesToCheck))]
public void IsIAsyncEnumerableTests(Type type, bool expectedOutcome)
{
Assert.Equal(expectedOutcome, ReflectionHelper.IsIAsyncEnumerable(type));
}
public static IEnumerable<object[]> TypesToCheck()
{
yield return new object[]
{
typeof(IAsyncEnumerable<object>),
true
};
yield return new object[]
{
typeof(ChannelReader<object>),
false
};
async IAsyncEnumerable<int> Stream()
{
await Task.Delay(10);
yield return 1;
}
object streamAsObject = Stream();
yield return new object[]
{
streamAsObject.GetType(),
true
};
yield return new object[]
{
typeof(string),
false
};
yield return new object[]
{
typeof(CustomAsyncEnumerable),
true
};
}
private class CustomAsyncEnumerable : IAsyncEnumerable<object>
{
public IAsyncEnumerator<object> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
}
}