From 48d97bf271b0d1a025bd4cf1ec6ec6fc88eff77f Mon Sep 17 00:00:00 2001 From: David Fowler Date: Mon, 18 Sep 2017 16:44:18 -0700 Subject: [PATCH] Turned Stream into StreamAsync (#902) * Turned Stream into StreamAsync - Before we were fire and forgetting the invocation that initiated the streaming, this changes that so that the caller now has to await to get the channel. #899 --- .../HubConnection.cs | 9 +++++++-- .../HubConnectionExtensions.cs | 8 ++++---- .../HubConnectionTests.cs | 3 ++- .../HubConnectionProtocolTests.cs | 10 +++++----- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs index 956c9bd97c..d6c542fbab 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs @@ -131,10 +131,15 @@ namespace Microsoft.AspNetCore.SignalR.Client _handlers.AddOrUpdate(methodName, invocationHandler, (_, __) => invocationHandler); } - public ReadableChannel Stream(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args) + public async Task> StreamAsync(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args) + { + return await StreamAsyncCore(methodName, returnType, cancellationToken).ForceAsync(); + } + + private async Task> StreamAsyncCore(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args) { var irq = InvocationRequest.Stream(cancellationToken, returnType, GetNextId(), _loggerFactory, out var channel); - _ = InvokeCore(methodName, irq, args, nonBlocking: false); + await InvokeCore(methodName, irq, args, nonBlocking: false); return channel; } diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs index 89667ca7c1..5516415ea2 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs @@ -41,17 +41,17 @@ namespace Microsoft.AspNetCore.SignalR.Client return hubConnection.SendAsync(methodName, CancellationToken.None, args); } - public static ReadableChannel Stream(this HubConnection hubConnection, string methodName, params object[] args) => - Stream(hubConnection, methodName, CancellationToken.None, args); + public static Task> StreamAsync(this HubConnection hubConnection, string methodName, params object[] args) => + StreamAsync(hubConnection, methodName, CancellationToken.None, args); - public static ReadableChannel Stream(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken, params object[] args) + public static async Task> StreamAsync(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken, params object[] args) { if (hubConnection == null) { throw new ArgumentNullException(nameof(hubConnection)); } - var inputChannel = hubConnection.Stream(methodName, typeof(TResult), cancellationToken, args); + var inputChannel = await hubConnection.StreamAsync(methodName, typeof(TResult), cancellationToken, args); var outputChannel = Channel.CreateUnbounded(); // Local function to provide a way to run async code as fire-and-forget diff --git a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs index debddd6198..e92ee22a3f 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs @@ -177,7 +177,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests var tcs = new TaskCompletionSource(); - var results = await connection.Stream("Stream").ReadAllAsync().OrTimeout(); + var channel = await connection.StreamAsync("Stream"); + var results = await channel.ReadAllAsync().OrTimeout(); Assert.Equal(new[] { "a", "b", "c" }, results.ToArray()); } diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs index d18ac39e14..c9a8928b4a 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs @@ -98,7 +98,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { await hubConnection.StartAsync(); - var channel = hubConnection.Stream("Foo"); + var channel = await hubConnection.StreamAsync("Foo"); // skip negotiation await connection.ReadSentTextMessageAsync().OrTimeout(); @@ -148,7 +148,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { await hubConnection.StartAsync(); - var channel = hubConnection.Stream("Foo"); + var channel = await hubConnection.StreamAsync("Foo"); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout(); @@ -192,7 +192,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { await hubConnection.StartAsync(); - var channel = hubConnection.Stream("Foo"); + var channel = await hubConnection.StreamAsync("Foo"); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = "Oops" }).OrTimeout(); @@ -238,7 +238,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { await hubConnection.StartAsync(); - var channel = hubConnection.Stream("Foo"); + var channel = await hubConnection.StreamAsync("Foo"); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout(); @@ -284,7 +284,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { await hubConnection.StartAsync(); - var channel = hubConnection.Stream("Foo"); + var channel = await hubConnection.StreamAsync("Foo"); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "2" }).OrTimeout();