Rename StreamAsync to StreamAsChannelAsync (#1587)

This commit is contained in:
BrennanConroy 2018-03-13 15:04:11 -07:00 committed by GitHub
parent fb6121399c
commit 62956530ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 47 deletions

View File

@ -157,16 +157,16 @@ namespace Microsoft.AspNetCore.SignalR.Client
return new Subscription(invocationHandler, invocationList); return new Subscription(invocationHandler, invocationList);
} }
public async Task<ChannelReader<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) public async Task<ChannelReader<object>> StreamAsChannelAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
{ {
return await StreamAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync(); return await StreamAsChannelAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
} }
private async Task<ChannelReader<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken) private async Task<ChannelReader<object>> StreamAsChannelAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{ {
if (!_startCalled) if (!_startCalled)
{ {
throw new InvalidOperationException($"The '{nameof(StreamAsync)}' method cannot be called before the connection has been started."); throw new InvalidOperationException($"The '{nameof(StreamAsChannelAsync)}' method cannot be called before the connection has been started.");
} }
var invokeCts = new CancellationTokenSource(); var invokeCts = new CancellationTokenSource();

View File

@ -10,69 +10,69 @@ namespace Microsoft.AspNetCore.SignalR.Client
{ {
public static partial class HubConnectionExtensions public static partial class HubConnectionExtensions
{ {
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
} }
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default) public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{ {
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken); return hubConnection.StreamAsChannelAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
} }
public static async Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default) public static async Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
{ {
if (hubConnection == null) if (hubConnection == null)
{ {
throw new ArgumentNullException(nameof(hubConnection)); throw new ArgumentNullException(nameof(hubConnection));
} }
var inputChannel = await hubConnection.StreamAsync(methodName, typeof(TResult), args, cancellationToken); var inputChannel = await hubConnection.StreamAsChannelAsync(methodName, typeof(TResult), args, cancellationToken);
var outputChannel = Channel.CreateUnbounded<TResult>(); var outputChannel = Channel.CreateUnbounded<TResult>();
// Local function to provide a way to run async code as fire-and-forget // Local function to provide a way to run async code as fire-and-forget

View File

@ -157,7 +157,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public override ValueTask<bool> StreamItem(object item) public override ValueTask<bool> StreamItem(object item)
{ {
Log.StreamItemOnNonStreamInvocation(Logger, InvocationId); Log.StreamItemOnNonStreamInvocation(Logger, InvocationId);
_completionSource.TrySetException(new InvalidOperationException($"Streaming hub methods must be invoked with the '{nameof(HubConnection)}.{nameof(HubConnection.StreamAsync)}' method.")); _completionSource.TrySetException(new InvalidOperationException($"Streaming hub methods must be invoked with the '{nameof(HubConnection)}.{nameof(HubConnection.StreamAsChannelAsync)}' method."));
// We "delivered" the stream item successfully as far as the caller cares // We "delivered" the stream item successfully as far as the caller cares
return new ValueTask<bool>(true); return new ValueTask<bool>(true);

View File

@ -300,7 +300,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("Stream", 5).OrTimeout(); var channel = await connection.StreamAsChannelAsync<int>("Stream", 5).OrTimeout();
var results = await channel.ReadAllAsync().OrTimeout(); var results = await channel.ReadAllAsync().OrTimeout();
Assert.Equal(new[] { 0, 1, 2, 3, 4 }, results.ToArray()); Assert.Equal(new[] { 0, 1, 2, 3, 4 }, results.ToArray());
@ -331,7 +331,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var channel = await connection.StreamAsync<int>("Stream", 1000, cts.Token).OrTimeout(); var channel = await connection.StreamAsChannelAsync<int>("Stream", 1000, cts.Token).OrTimeout();
await channel.WaitToReadAsync().AsTask().OrTimeout(); await channel.WaitToReadAsync().AsTask().OrTimeout();
cts.Cancel(); cts.Cancel();
@ -367,7 +367,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
cts.Cancel(); cts.Cancel();
var channel = await connection.StreamAsync<int>("Stream", 5, cts.Token).OrTimeout(); var channel = await connection.StreamAsChannelAsync<int>("Stream", 5, cts.Token).OrTimeout();
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => channel.WaitToReadAsync().AsTask().OrTimeout()); await Assert.ThrowsAnyAsync<OperationCanceledException>(() => channel.WaitToReadAsync().AsTask().OrTimeout());
} }
@ -394,7 +394,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
try try
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("StreamException").OrTimeout(); var channel = await connection.StreamAsChannelAsync<int>("StreamException").OrTimeout();
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout()); var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout());
Assert.Equal("An unexpected error occurred invoking 'StreamException' on the server. InvalidOperationException: Error occurred while streaming.", ex.Message); Assert.Equal("An unexpected error occurred invoking 'StreamException' on the server. InvalidOperationException: Error occurred while streaming.", ex.Message);
@ -504,7 +504,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("!@#$%"); var channel = await connection.StreamAsChannelAsync<int>("!@#$%");
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout()); var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout());
Assert.Equal("Unknown hub method '!@#$%'", ex.Message); Assert.Equal("Unknown hub method '!@#$%'", ex.Message);
} }
@ -533,7 +533,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("Stream", 42, 42); var channel = await connection.StreamAsChannelAsync<int>("Stream", 42, 42);
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout()); var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout());
Assert.Equal("Failed to invoke 'Stream'. Invocation provides 2 argument(s) but target expects 1.", ex.Message); Assert.Equal("Failed to invoke 'Stream'. Invocation provides 2 argument(s) but target expects 1.", ex.Message);
} }
@ -561,7 +561,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("Stream", "xyz"); var channel = await connection.StreamAsChannelAsync<int>("Stream", "xyz");
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout()); var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync().OrTimeout());
Assert.StartsWith("Failed to invoke 'Stream'. Error binding arguments. Make sure that the types of the provided values match the types of the hub method being invoked.", ex.Message); Assert.StartsWith("Failed to invoke 'Stream'. Error binding arguments. Make sure that the types of the provided values match the types of the hub method being invoked.", ex.Message);
} }
@ -588,7 +588,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
try try
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("HelloWorld").OrTimeout(); var channel = await connection.StreamAsChannelAsync<int>("HelloWorld").OrTimeout();
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync()).OrTimeout(); var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync()).OrTimeout();
Assert.Equal("The client attempted to invoke the non-streaming 'HelloWorld' method in a streaming fashion.", ex.Message); Assert.Equal("The client attempted to invoke the non-streaming 'HelloWorld' method in a streaming fashion.", ex.Message);
} }
@ -642,7 +642,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
try try
{ {
await connection.StartAsync().OrTimeout(); await connection.StartAsync().OrTimeout();
var channel = await connection.StreamAsync<int>("StreamBroken").OrTimeout(); var channel = await connection.StreamAsChannelAsync<int>("StreamBroken").OrTimeout();
var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync()).OrTimeout(); var ex = await Assert.ThrowsAsync<HubException>(() => channel.ReadAllAsync()).OrTimeout();
Assert.Equal("The value returned by the streaming method 'StreamBroken' is null, does not implement the IObservable<> interface or is not a ReadableChannel<>.", ex.Message); Assert.Equal("The value returned by the streaming method 'StreamBroken' is null, does not implement the IObservable<> interface or is not a ReadableChannel<>.", ex.Message);
} }

View File

@ -96,7 +96,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{ {
await hubConnection.StartAsync(); await hubConnection.StartAsync();
var channel = await hubConnection.StreamAsync<object>("Foo"); var channel = await hubConnection.StreamAsChannelAsync<object>("Foo");
// skip negotiation // skip negotiation
await connection.ReadSentTextMessageAsync().OrTimeout(); await connection.ReadSentTextMessageAsync().OrTimeout();
@ -146,7 +146,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{ {
await hubConnection.StartAsync(); await hubConnection.StartAsync();
var channel = await hubConnection.StreamAsync<int>("Foo"); var channel = await hubConnection.StreamAsChannelAsync<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
@ -213,7 +213,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{ {
await hubConnection.StartAsync(); await hubConnection.StartAsync();
var channel = await hubConnection.StreamAsync<string>("Foo"); var channel = await hubConnection.StreamAsChannelAsync<string>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = "Oops" }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, result = "Oops" }).OrTimeout();
@ -236,7 +236,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{ {
await hubConnection.StartAsync(); await hubConnection.StartAsync();
var channel = await hubConnection.StreamAsync<int>("Foo"); var channel = await hubConnection.StreamAsChannelAsync<int>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3, error = "An error occurred" }).OrTimeout();
@ -264,7 +264,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 42 }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 42 }).OrTimeout();
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => invokeTask).OrTimeout(); var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => invokeTask).OrTimeout();
Assert.Equal("Streaming hub methods must be invoked with the 'HubConnection.StreamAsync' method.", ex.Message); Assert.Equal("Streaming hub methods must be invoked with the 'HubConnection.StreamAsChannelAsync' method.", ex.Message);
} }
finally finally
{ {
@ -282,7 +282,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{ {
await hubConnection.StartAsync(); await hubConnection.StartAsync();
var channel = await hubConnection.StreamAsync<string>("Foo"); var channel = await hubConnection.StreamAsChannelAsync<string>("Foo");
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).OrTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "2" }).OrTimeout(); await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "2" }).OrTimeout();

View File

@ -142,7 +142,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await hubConnection.StartAsync(); await hubConnection.StartAsync();
await hubConnection.DisposeAsync(); await hubConnection.DisposeAsync();
var exception = await Assert.ThrowsAsync<InvalidOperationException>( var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => hubConnection.StreamAsync<int>("test")); () => hubConnection.StreamAsChannelAsync<int>("test"));
Assert.Equal("Connection has been terminated.", exception.Message); Assert.Equal("Connection has been terminated.", exception.Message);
} }
@ -154,9 +154,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory()); var hubConnection = new HubConnection(connection, new JsonHubProtocol(), new LoggerFactory());
var exception = await Assert.ThrowsAsync<InvalidOperationException>( var exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => hubConnection.StreamAsync<int>("test")); () => hubConnection.StreamAsChannelAsync<int>("test"));
Assert.Equal("The 'StreamAsync' method cannot be called before the connection has been started.", exception.Message); Assert.Equal("The 'StreamAsChannelAsync' method cannot be called before the connection has been started.", exception.Message);
} }
[Fact] [Fact]