Turned Stream into StreamAsync (#902)

* Turned Stream into StreamAsync
- Before we were fire and forgetting the invocation that initiated
the streaming, this changes that so that the caller now has to await
to get the channel.

#899
This commit is contained in:
David Fowler 2017-09-18 16:44:18 -07:00 committed by GitHub
parent a93e4be82f
commit 48d97bf271
4 changed files with 18 additions and 12 deletions

View File

@ -131,10 +131,15 @@ namespace Microsoft.AspNetCore.SignalR.Client
_handlers.AddOrUpdate(methodName, invocationHandler, (_, __) => invocationHandler);
}
public ReadableChannel<object> Stream(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
public async Task<ReadableChannel<object>> StreamAsync(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
{
return await StreamAsyncCore(methodName, returnType, cancellationToken).ForceAsync();
}
private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
{
var irq = InvocationRequest.Stream(cancellationToken, returnType, GetNextId(), _loggerFactory, out var channel);
_ = InvokeCore(methodName, irq, args, nonBlocking: false);
await InvokeCore(methodName, irq, args, nonBlocking: false);
return channel;
}

View File

@ -41,17 +41,17 @@ namespace Microsoft.AspNetCore.SignalR.Client
return hubConnection.SendAsync(methodName, CancellationToken.None, args);
}
public static ReadableChannel<TResult> Stream<TResult>(this HubConnection hubConnection, string methodName, params object[] args) =>
Stream<TResult>(hubConnection, methodName, CancellationToken.None, args);
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, params object[] args) =>
StreamAsync<TResult>(hubConnection, methodName, CancellationToken.None, args);
public static ReadableChannel<TResult> Stream<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken, params object[] args)
public static async Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken, params object[] args)
{
if (hubConnection == null)
{
throw new ArgumentNullException(nameof(hubConnection));
}
var inputChannel = hubConnection.Stream(methodName, typeof(TResult), cancellationToken, args);
var inputChannel = await hubConnection.StreamAsync(methodName, typeof(TResult), cancellationToken, args);
var outputChannel = Channel.CreateUnbounded<TResult>();
// Local function to provide a way to run async code as fire-and-forget

View File

@ -177,7 +177,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var tcs = new TaskCompletionSource<string>();
var results = await connection.Stream<string>("Stream").ReadAllAsync().OrTimeout();
var channel = await connection.StreamAsync<string>("Stream");
var results = await channel.ReadAllAsync().OrTimeout();
Assert.Equal(new[] { "a", "b", "c" }, results.ToArray());
}

View File

@ -98,7 +98,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<object>("Foo");
var channel = await hubConnection.StreamAsync<object>("Foo");
// skip negotiation
await connection.ReadSentTextMessageAsync().OrTimeout();
@ -148,7 +148,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<int>("Foo");
var channel = await hubConnection.StreamAsync<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
@ -192,7 +192,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<string>("Foo");
var channel = await hubConnection.StreamAsync<string>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = "Oops" }).OrTimeout();
@ -238,7 +238,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<int>("Foo");
var channel = await hubConnection.StreamAsync<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout();
@ -284,7 +284,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
await hubConnection.StartAsync();
var channel = hubConnection.Stream<string>("Foo");
var channel = await hubConnection.StreamAsync<string>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "2" }).OrTimeout();