Microsoft.AspNetCore.SignalR.Client refactor (#2000)

This commit is contained in:
James Newton-King 2018-04-14 21:09:41 +12:00 committed by GitHub
parent 3f0a6ebc0d
commit e4d0f2980e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 270 additions and 220 deletions

View File

@ -36,6 +36,8 @@ namespace Microsoft.AspNetCore.SignalR.Crankier
Trace.WriteLine($"Connection terminated with error: {ex.GetType()}: {ex.Message}");
_connectionState = ConnectionState.Faulted;
}
return Task.CompletedTask;
};
_sendCts = new CancellationTokenSource();

View File

@ -11,6 +11,7 @@ using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.AspNetCore.SignalR.Protocol;
@ -44,12 +45,17 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
_tcs = new TaskCompletionSource<ReadResult>();
_pipe.AddReadResult(new ValueTask<ReadResult>(_tcs.Task));
var protocol = Protocol == "json" ? (IHubProtocol)new JsonHubProtocol() : new MessagePackHubProtocol();
var hubConnectionBuilder = new HubConnectionBuilder();
hubConnectionBuilder.WithHubProtocol(protocol);
hubConnectionBuilder.WithConnectionFactory(format =>
if (Protocol == "json")
{
// JSON protocol added by default
}
else
{
hubConnectionBuilder.AddMessagePackProtocol();
}
var delegateConnectionFactory = new DelegateConnectionFactory(format =>
{
var connection = new DefaultConnectionContext();
// prevents keep alive time being activated
@ -63,6 +69,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
connection.Transport.Input.Complete();
return Task.CompletedTask;
});
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
_hubConnection = hubConnectionBuilder.Build();
_hubConnection.StartAsync().GetAwaiter().GetResult();
@ -90,7 +97,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Benchmark]
public Task SendAsync()
{
return _hubConnection.SendAsync("Dummy", _arguments);
return _hubConnection.SendCoreAsync("Dummy", _arguments);
}
}
}

View File

@ -12,6 +12,8 @@ using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Microbenchmarks.Shared;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
@ -38,8 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
_pipe = new TestDuplexPipe();
var hubConnectionBuilder = new HubConnectionBuilder();
hubConnectionBuilder.WithHubProtocol(new JsonHubProtocol());
hubConnectionBuilder.WithConnectionFactory(format =>
var delegateConnectionFactory = new DelegateConnectionFactory(format =>
{
var connection = new DefaultConnectionContext();
// prevents keep alive time being activated
@ -53,6 +54,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
connection.Transport.Input.Complete();
return Task.CompletedTask;
});
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
_hubConnection = hubConnectionBuilder.Build();
}

View File

@ -32,7 +32,7 @@ namespace ClientSample
var uri = baseUrl == null ? new Uri("net.tcp://127.0.0.1:9001") : new Uri(baseUrl);
Console.WriteLine("Connecting to {0}", uri);
var connectionBuilder = new HubConnectionBuilder()
.WithLogging(logging =>
.ConfigureLogging(logging =>
{
logging.AddConsole();
});
@ -65,6 +65,7 @@ namespace ClientSample
closedTokenSource.Cancel();
Console.WriteLine("Connection closed...");
return Task.CompletedTask;
};
while (true)

View File

@ -1,5 +1,6 @@
using System;
using System.Net;
using System.Threading.Tasks;
using ClientSample;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.DependencyInjection;
@ -31,12 +32,29 @@ namespace Microsoft.AspNetCore.SignalR.Client
public static IHubConnectionBuilder WithEndPoint(this IHubConnectionBuilder builder, EndPoint endPoint)
{
builder.WithConnectionFactory(
format => new TcpConnection(endPoint).StartAsync(),
connection => ((TcpConnection)connection).DisposeAsync()
);
builder.Services.AddSingleton<IConnectionFactory>(new TcpConnectionFactory(endPoint));
return builder;
}
private class TcpConnectionFactory : IConnectionFactory
{
private readonly EndPoint _endPoint;
public TcpConnectionFactory(EndPoint endPoint)
{
_endPoint = endPoint;
}
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
{
return new TcpConnection(_endPoint).StartAsync();
}
public Task DisposeAsync(ConnectionContext connection)
{
return ((TcpConnection)connection).DisposeAsync();
}
}
}
}

View File

@ -40,7 +40,11 @@ namespace JwtClientSample
.Build();
var closedTcs = new TaskCompletionSource<object>();
hubConnection.Closed += e => closedTcs.SetResult(null);
hubConnection.Closed += e =>
{
closedTcs.SetResult(null);
return Task.CompletedTask;
};
hubConnection.On<string, string>("Message", (sender, message) => Console.WriteLine($"[{userId}] {sender}: {message}"));
await hubConnection.StartAsync();

View File

@ -283,7 +283,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
}
else if (context.Response.StatusCode == StatusCodes.Status204NoContent)
{
// Don't poll if the transport task was cancelled
// Don't poll if the transport task was canceled
pollAgain = false;
}

View File

@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
}
// We're intentionally not checking cancellation here because we need to drain messages we've got so far,
// but it's too late to emit the 204 required by being cancelled.
// but it's too late to emit the 204 required by being canceled.
Log.LongPollingWritingMessage(_logger, buffer.Length);

View File

@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// Transient state to a connection
private ConnectionState _connectionState;
public event Action<Exception> Closed;
public event Func<Exception, Task> Closed;
/// <summary>
/// Gets or sets the server timeout interval for the connection. Changes to this value
@ -51,10 +51,15 @@ namespace Microsoft.AspNetCore.SignalR.Client
public TimeSpan HandshakeTimeout { get; set; } = DefaultHandshakeTimeout;
public HubConnection(IConnectionFactory connectionFactory, IHubProtocol protocol, IServiceProvider serviceProvider, ILoggerFactory loggerFactory)
: this(connectionFactory, protocol, loggerFactory)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public HubConnection(IConnectionFactory connectionFactory, IHubProtocol protocol, ILoggerFactory loggerFactory)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_protocol = protocol ?? throw new ArgumentNullException(nameof(protocol));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
@ -66,7 +71,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
await StartAsyncCore(cancellationToken).ForceAsync();
}
public async Task StopAsync()
public async Task StopAsync(CancellationToken cancellationToken = default)
{
CheckDisposed();
await StopAsyncCore(disposing: false).ForceAsync();
@ -101,16 +106,16 @@ namespace Microsoft.AspNetCore.SignalR.Client
return new Subscription(invocationHandler, invocationList);
}
public async Task<ChannelReader<object>> StreamAsChannelAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) =>
await StreamAsChannelAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
public async Task<ChannelReader<object>> StreamAsChannelCoreAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) =>
await StreamAsChannelCoreAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
public async Task<object> InvokeAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) =>
await InvokeAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
public async Task<object> InvokeCoreAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) =>
await InvokeCoreAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
// REVIEW: We don't generally use cancellation tokens when writing to a pipe because the asynchrony is only the result of backpressure.
// However, this would be the only "invocation" method _without_ a cancellation token... which is odd.
public async Task SendAsync(string methodName, object[] args, CancellationToken cancellationToken = default) =>
await SendAsyncCore(methodName, args, cancellationToken).ForceAsync();
public async Task SendCoreAsync(string methodName, object[] args, CancellationToken cancellationToken = default) =>
await SendCoreAsyncCore(methodName, args, cancellationToken).ForceAsync();
private async Task StartAsyncCore(CancellationToken cancellationToken)
{
@ -209,9 +214,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private async Task<ChannelReader<object>> StreamAsChannelAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
private async Task<ChannelReader<object>> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
async Task OnStreamCancelled(InvocationRequest irq)
async Task OnStreamCanceled(InvocationRequest irq)
{
// We need to take the connection lock in order to ensure we a) have a connection and b) are the only one accessing the write end of the pipe.
await WaitConnectionLockAsync();
@ -245,14 +250,14 @@ namespace Microsoft.AspNetCore.SignalR.Client
try
{
CheckDisposed();
CheckConnectionActive(nameof(StreamAsChannelAsync));
CheckConnectionActive(nameof(StreamAsChannelCoreAsync));
var irq = InvocationRequest.Stream(cancellationToken, returnType, _connectionState.GetNextId(), _loggerFactory, this, out channel);
await InvokeStreamCore(methodName, irq, args, cancellationToken);
if (cancellationToken.CanBeCanceled)
{
cancellationToken.Register(state => _ = OnStreamCancelled((InvocationRequest)state), irq);
cancellationToken.Register(state => _ = OnStreamCanceled((InvocationRequest)state), irq);
}
}
finally
@ -264,7 +269,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
private async Task<object> InvokeAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
private async Task<object> InvokeCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
CheckDisposed();
await WaitConnectionLockAsync();
@ -273,7 +278,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
try
{
CheckDisposed();
CheckConnectionActive(nameof(InvokeAsync));
CheckConnectionActive(nameof(InvokeCoreAsync));
var irq = InvocationRequest.Invoke(cancellationToken, returnType, _connectionState.GetNextId(), _loggerFactory, this, out invocationTask);
await InvokeCore(methodName, irq, args, cancellationToken);
@ -351,13 +356,13 @@ namespace Microsoft.AspNetCore.SignalR.Client
Log.SendingMessage(_logger, hubMessage);
// REVIEW: If a token is passed in and is cancelled during FlushAsync it seems to break .Complete()...
// REVIEW: If a token is passed in and is canceled during FlushAsync it seems to break .Complete()...
await _connectionState.Connection.Transport.Output.FlushAsync();
Log.MessageSent(_logger, hubMessage);
}
private async Task SendAsyncCore(string methodName, object[] args, CancellationToken cancellationToken)
private async Task SendCoreAsyncCore(string methodName, object[] args, CancellationToken cancellationToken)
{
CheckDisposed();
@ -365,7 +370,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
try
{
CheckDisposed();
CheckConnectionActive(nameof(SendAsync));
CheckConnectionActive(nameof(SendCoreAsync));
Log.PreparingNonBlockingInvocation(_logger, methodName, args.Length);
@ -592,7 +597,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
if (result.IsCanceled)
{
// We were cancelled. Possibly because we were stopped gracefully
// We were canceled. Possibly because we were stopped gracefully
break;
}
else if (!buffer.IsEmpty)
@ -685,7 +690,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
private async Task RunClosedEvent(Action<Exception> closed, Exception closeException)
private async Task RunClosedEvent(Func<Exception, Task> closed, Exception closeException)
{
// Dispatch to the thread pool before we invoke the user callback
await AwaitableThreadPool.Yield();
@ -693,7 +698,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
try
{
Log.InvokingClosedEventHandler(_logger);
closed.Invoke(closeException);
await closed.Invoke(closeException);
}
catch (Exception ex)
{

View File

@ -2,61 +2,17 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Microsoft.AspNetCore.SignalR.Client
{
public static class HubConnectionBuilderExtensions
{
public static IHubConnectionBuilder WithConnectionFactory(this IHubConnectionBuilder hubConnectionBuilder,
Func<TransferFormat, Task<ConnectionContext>> connectionFactory,
Func<ConnectionContext, Task> disposeCallback)
{
if (connectionFactory == null)
{
throw new ArgumentNullException(nameof(connectionFactory));
}
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(new DelegateConnectionFactory(connectionFactory, disposeCallback));
return hubConnectionBuilder;
}
public static IHubConnectionBuilder WithHubProtocol(this IHubConnectionBuilder hubConnectionBuilder, IHubProtocol hubProtocol)
{
hubConnectionBuilder.Services.AddSingleton(hubProtocol);
return hubConnectionBuilder;
}
public static IHubConnectionBuilder WithLogging(this IHubConnectionBuilder hubConnectionBuilder, Action<ILoggingBuilder> configureLogging)
public static IHubConnectionBuilder ConfigureLogging(this IHubConnectionBuilder hubConnectionBuilder, Action<ILoggingBuilder> configureLogging)
{
hubConnectionBuilder.Services.AddLogging(configureLogging);
return hubConnectionBuilder;
}
private class DelegateConnectionFactory : IConnectionFactory
{
private readonly Func<TransferFormat, Task<ConnectionContext>> _connectionFactory;
private readonly Func<ConnectionContext, Task> _disposeCallback;
public DelegateConnectionFactory(Func<TransferFormat, Task<ConnectionContext>> connectionFactory, Func<ConnectionContext, Task> disposeCallback)
{
_connectionFactory = connectionFactory;
_disposeCallback = disposeCallback;
}
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
{
return _connectionFactory(transferFormat);
}
public Task DisposeAsync(ConnectionContext connection)
{
return _disposeCallback(connection);
}
}
}
}

View File

@ -13,67 +13,67 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, Array.Empty<object>(), cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, Array.Empty<object>(), cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
}
public static Task InvokeAsync(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
public static Task InvokeCoreAsync(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (hubConnection == null)
{
throw new ArgumentNullException(nameof(hubConnection));
}
return hubConnection.InvokeAsync(methodName, typeof(object), args, cancellationToken);
return hubConnection.InvokeCoreAsync(methodName, typeof(object), args, cancellationToken);
}
}
}

View File

@ -13,67 +13,67 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
}
public static Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{
return hubConnection.InvokeAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
return hubConnection.InvokeCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
}
public static async Task<TResult> InvokeAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
public static async Task<TResult> InvokeCoreAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (hubConnection == null)
{
throw new ArgumentNullException(nameof(hubConnection));
}
return (TResult)await hubConnection.InvokeAsync(methodName, typeof(TResult), args, cancellationToken);
return (TResult)await hubConnection.InvokeCoreAsync(methodName, typeof(TResult), args, cancellationToken);
}
}

View File

@ -11,57 +11,57 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public static Task SendAsync(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, Array.Empty<object>(), cancellationToken);
return hubConnection.SendCoreAsync(methodName, Array.Empty<object>(), cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
}
public static Task SendAsync(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{
return hubConnection.SendAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
return hubConnection.SendCoreAsync(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
}
}
}

View File

@ -12,67 +12,67 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
}
public static Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsChannelAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
return hubConnection.StreamAsChannelCoreAsync<TResult>(methodName, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
}
public static async Task<ChannelReader<TResult>> StreamAsChannelAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
public static async Task<ChannelReader<TResult>> StreamAsChannelCoreAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (hubConnection == null)
{
throw new ArgumentNullException(nameof(hubConnection));
}
var inputChannel = await hubConnection.StreamAsChannelAsync(methodName, typeof(TResult), args, cancellationToken);
var inputChannel = await hubConnection.StreamAsChannelCoreAsync(methodName, typeof(TResult), args, cancellationToken);
var outputChannel = Channel.CreateUnbounded<TResult>();
// Local function to provide a way to run async code as fire-and-forget

View File

@ -157,7 +157,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public override ValueTask<bool> StreamItem(object item)
{
Log.StreamItemOnNonStreamInvocation(Logger, InvocationId);
_completionSource.TrySetException(new InvalidOperationException($"Streaming hub methods must be invoked with the '{nameof(HubConnection)}.{nameof(HubConnection.StreamAsChannelAsync)}' method."));
_completionSource.TrySetException(new InvalidOperationException($"Streaming hub methods must be invoked with the '{nameof(HubConnection)}.{nameof(HubConnectionExtensions.StreamAsChannelAsync)}' method."));
// We "delivered" the stream item successfully as far as the caller cares
return new ValueTask<bool>(true);

View File

@ -14,6 +14,7 @@ using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Xunit;
@ -49,10 +50,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
ILoggerFactory loggerFactory = null)
{
var hubConnectionBuilder = new HubConnectionBuilder();
hubConnectionBuilder.WithHubProtocol(protocol);
hubConnectionBuilder.Services.AddSingleton(protocol);
hubConnectionBuilder.WithLoggerFactory(loggerFactory);
hubConnectionBuilder.WithConnectionFactory(GetHttpConnectionFactory(loggerFactory, path, transportType ?? HttpTransportType.LongPolling | HttpTransportType.WebSockets | HttpTransportType.ServerSentEvents),
connection => ((HttpConnection)connection).DisposeAsync());
var delegateConnectionFactory = new DelegateConnectionFactory(
GetHttpConnectionFactory(loggerFactory, path, transportType ?? HttpTransportType.LongPolling | HttpTransportType.WebSockets | HttpTransportType.ServerSentEvents),
connection => ((HttpConnection)connection).DisposeAsync());
hubConnectionBuilder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
return hubConnectionBuilder.Build();
}
@ -74,11 +78,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
var protocol = HubProtocols[protocolName];
using (StartLog(out var loggerFactory, $"{nameof(CheckFixedMessage)}_{protocol.Name}_{transportType}_{path.TrimStart('/')}"))
{
var connection = new HubConnectionBuilder()
var connectionBuilder = new HubConnectionBuilder()
.WithLoggerFactory(loggerFactory)
.WithHubProtocol(protocol)
.WithUrl(_serverFixture.Url + path, transportType)
.Build();
.WithUrl(_serverFixture.Url + path, transportType);
connectionBuilder.Services.AddSingleton(protocol);
var connection = connectionBuilder.Build();
try
{
@ -303,6 +308,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
closeTcs.SetResult(null);
}
return Task.CompletedTask;
};
try
@ -755,7 +761,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
try
{
await hubConnection.StartAsync().OrTimeout();
var headerValues = await hubConnection.InvokeAsync<string[]>(nameof(TestHub.GetHeaderValues), new object[] { new[] { "X-test", "X-42" } }).OrTimeout();
var headerValues = await hubConnection.InvokeAsync<string[]>(nameof(TestHub.GetHeaderValues), new[] { "X-test", "X-42" }).OrTimeout();
Assert.Equal(new[] { "42", "test" }, headerValues);
}
catch (Exception ex)
@ -790,7 +796,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
try
{
await hubConnection.StartAsync().OrTimeout();
var cookieValue = await hubConnection.InvokeAsync<string>(nameof(TestHub.GetCookieValue), new object[] { "Foo" }).OrTimeout();
var cookieValue = await hubConnection.InvokeAsync<string>(nameof(TestHub.GetCookieValue), "Foo").OrTimeout();
Assert.Equal("Bar", cookieValue);
}
catch (Exception ex)
@ -847,11 +853,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
using (StartLog(out var loggerFactory))
{
var hubConnection = new HubConnectionBuilder()
var hubConnectionBuilder = new HubConnectionBuilder()
.WithLoggerFactory(loggerFactory)
.WithHubProtocol(new MessagePackHubProtocol())
.WithUrl(_serverFixture.Url + "/default-nowebsockets")
.Build();
.AddMessagePackProtocol()
.WithUrl(_serverFixture.Url + "/default-nowebsockets");
var hubConnection = hubConnectionBuilder.Build();
try
{
await hubConnection.StartAsync().OrTimeout();

View File

@ -258,7 +258,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.StartAsync(TransferFormat.Text).OrTimeout();
await connection.DisposeAsync().OrTimeout();
// This will throw OperationCancelledException if it's forcibly terminated
// This will throw OperationCanceledException if it's forcibly terminated
// which we don't want
await transport.Receiving.OrTimeout();
});

View File

@ -67,21 +67,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Assert.NotNull(loggerFactory);
}
[Fact]
public void WithHubProtocolAddsProtocol()
{
var hubProtocol = Mock.Of<IHubProtocol>();
var connectionBuilder = new HubConnectionBuilder();
connectionBuilder.WithHubProtocol(hubProtocol);
var serviceProvider = connectionBuilder.Services.BuildServiceProvider();
var resolvedHubProtocol = serviceProvider.GetService<IHubProtocol>();
Assert.Same(hubProtocol, resolvedHubProtocol);
}
[Fact]
public void AddJsonProtocolAddsProtocol()
{

View File

@ -4,8 +4,11 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using Xunit;
@ -45,29 +48,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Assert.Equal("JUST A TEST", actualProtocol.PayloadSerializer.DateFormatString);
}
[Fact]
public async Task WithConnectionFactorySetsConnectionFactory()
{
var called = false;
Func<TransferFormat, Task<ConnectionContext>> connectionFactory = format =>
{
called = true;
return Task.FromResult<ConnectionContext>(null);
};
var serviceProvider = new HubConnectionBuilder().WithConnectionFactory(connectionFactory, connection => Task.CompletedTask).Services.BuildServiceProvider();
var factory = serviceProvider.GetService<IConnectionFactory>();
Assert.NotNull(factory);
Assert.False(called);
await factory.ConnectAsync(TransferFormat.Text);
Assert.True(called);
}
[Fact]
public void BuildCanOnlyBeCalledOnce()
{
var builder = new HubConnectionBuilder().WithConnectionFactory(format => null, connection => Task.CompletedTask);
var builder = new HubConnectionBuilder();
builder.Services.AddSingleton<IConnectionFactory>(new HttpConnectionFactory(Options.Create(new HttpConnectionOptions()), NullLoggerFactory.Instance));
Assert.NotNull(builder.Build());

View File

@ -5,6 +5,8 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json.Linq;
using Xunit;
@ -17,9 +19,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// This tactic (using names and a dictionary) allows non-serializable data (like a Func) to be used in a theory AND get it to show in the new hierarchical view in Test Explorer as separate tests you can run individually.
private static readonly IDictionary<string, Func<HubConnection, Task>> MethodsThatRequireActiveConnection = new Dictionary<string, Func<HubConnection, Task>>()
{
{ nameof(HubConnection.InvokeAsync), (connection) => connection.InvokeAsync("Foo") },
{ nameof(HubConnection.SendAsync), (connection) => connection.SendAsync("Foo") },
{ nameof(HubConnection.StreamAsChannelAsync), (connection) => connection.StreamAsChannelAsync<object>("Foo") },
{ nameof(HubConnection.InvokeCoreAsync), (connection) => connection.InvokeAsync("Foo") },
{ nameof(HubConnection.SendCoreAsync), (connection) => connection.SendAsync("Foo") },
{ nameof(HubConnection.StreamAsChannelCoreAsync), (connection) => connection.StreamAsChannelAsync<object>("Foo") },
};
public static IEnumerable<object[]> MethodsNamesThatRequireActiveConnection => MethodsThatRequireActiveConnection.Keys.Select(k => new object[] { k });
@ -27,14 +29,22 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private HubConnection CreateHubConnection(TestConnection testConnection)
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => testConnection.StartAsync(format), connection => ((TestConnection)connection).DisposeAsync());
var delegateConnectionFactory = new DelegateConnectionFactory(
testConnection.StartAsync,
connection => ((TestConnection)connection).DisposeAsync());
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
return builder.Build();
}
private HubConnection CreateHubConnection(Func<TransferFormat, Task<ConnectionContext>> connectionFactory, Func<ConnectionContext, Task> disposeCallback)
private HubConnection CreateHubConnection(Func<TransferFormat, Task<ConnectionContext>> connectDelegate, Func<ConnectionContext, Task> disposeDelegate)
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => connectionFactory(format), disposeCallback);
var delegateConnectionFactory = new DelegateConnectionFactory(connectDelegate, disposeDelegate);
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
return builder.Build();
}
@ -195,7 +205,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await startTask;
// We need some special logic to ensure InvokeAsync completes.
if (string.Equals(name, nameof(HubConnection.InvokeAsync)))
if (string.Equals(name, nameof(HubConnection.InvokeCoreAsync)))
{
await ForceLastInvocationToComplete(testConnection);
}
@ -253,7 +263,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var closed = new TaskCompletionSource<object>();
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
connection.Closed += (e) => closed.TrySetResult(null);
connection.Closed += (e) =>
{
closed.TrySetResult(null);
return Task.CompletedTask;
};
await connection.StartAsync().OrTimeout();
Assert.True(testConnection.Started.IsCompleted);
@ -263,7 +277,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// We should be stopped now
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo").OrTimeout());
Assert.Equal($"The '{nameof(HubConnection.SendAsync)}' method cannot be called if the connection is not active", ex.Message);
Assert.Equal($"The '{nameof(HubConnection.SendCoreAsync)}' method cannot be called if the connection is not active", ex.Message);
});
}
@ -277,7 +291,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
// We're hooking the TestConnection shutting down here because the HubConnection one will be blocked on the lock
testConnection.Transport.Input.OnWriterCompleted((_, __) => testConnectionClosed.TrySetResult(null), null);
connection.Closed += (e) => connectionClosed.TrySetResult(null);
connection.Closed += (e) =>
{
connectionClosed.TrySetResult(null);
return Task.CompletedTask;
};
await connection.StartAsync().OrTimeout();
Assert.True(testConnection.Started.IsCompleted);
@ -297,7 +315,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// We should be stopped now
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo").OrTimeout());
Assert.Equal($"The '{nameof(HubConnection.SendAsync)}' method cannot be called if the connection is not active", ex.Message);
Assert.Equal($"The '{nameof(HubConnection.SendCoreAsync)}' method cannot be called if the connection is not active", ex.Message);
await testConnection.Disposed.OrTimeout();
@ -312,7 +330,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var connectionClosed = new TaskCompletionSource<object>();
await AsyncUsing(CreateHubConnection(testConnection), async connection =>
{
connection.Closed += (e) => connectionClosed.TrySetResult(null);
connection.Closed += (e) =>
{
connectionClosed.TrySetResult(null);
return Task.CompletedTask;
};
await connection.StartAsync().OrTimeout();
Assert.True(testConnection.Started.IsCompleted);
@ -331,7 +353,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// We should be stopped now
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => connection.SendAsync("Foo").OrTimeout());
Assert.Equal($"The '{nameof(HubConnection.SendAsync)}' method cannot be called if the connection is not active", ex.Message);
Assert.Equal($"The '{nameof(HubConnection.SendCoreAsync)}' method cannot be called if the connection is not active", ex.Message);
});
}

View File

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
@ -10,19 +9,19 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
private static HubConnection CreateHubConnection(TestConnection connection, IHubProtocol protocol = null)
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(async format =>
{
await connection.StartAsync(format);
return connection;
},
connecton => ((TestConnection)connection).DisposeAsync());
var delegateConnectionFactory = new DelegateConnectionFactory(
connection.StartAsync,
c => ((TestConnection)c).DisposeAsync());
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
if (protocol != null)
{
builder.WithHubProtocol(protocol);
builder.Services.AddSingleton(protocol);
}
return builder.Build();
}
}
}
}

View File

@ -92,7 +92,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection);
hubConnection.Closed += e => closedTcs.SetResult(e);
hubConnection.Closed += e =>
{
closedTcs.SetResult(e);
return Task.CompletedTask;
};
try
{
@ -117,7 +121,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection);
hubConnection.Closed += e => closedTcs.SetResult(e);
hubConnection.Closed += e =>
{
closedTcs.SetResult(e);
return Task.CompletedTask;
};
try
{

View File

@ -6,6 +6,8 @@ using System.Buffers;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using Xunit;
@ -41,12 +43,19 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public async Task ClosedEventRaisedWhenTheClientIsStopped()
{
var builder = new HubConnectionBuilder();
builder.WithConnectionFactory(format => new TestConnection().StartAsync(format),
connection => ((TestConnection)connection).DisposeAsync());
var delegateConnectionFactory = new DelegateConnectionFactory(
format => new TestConnection().StartAsync(format),
connection => ((TestConnection)connection).DisposeAsync());
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
var hubConnection = builder.Build();
var closedEventTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e => closedEventTcs.SetResult(e);
hubConnection.Closed += e =>
{
closedEventTcs.SetResult(e);
return Task.CompletedTask;
};
await hubConnection.StartAsync().OrTimeout();
await hubConnection.StopAsync().OrTimeout();
@ -54,7 +63,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
[Fact]
public async Task PendingInvocationsAreCancelledWhenConnectionClosesCleanly()
public async Task PendingInvocationsAreCanceledWhenConnectionClosesCleanly()
{
var hubConnection = CreateHubConnection(new TestConnection());
@ -88,7 +97,11 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
hubConnection.ServerTimeout = TimeSpan.FromMilliseconds(100);
var closeTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += ex => closeTcs.TrySetResult(ex);
hubConnection.Closed += ex =>
{
closeTcs.TrySetResult(ex);
return Task.CompletedTask;
};
await hubConnection.StartAsync().OrTimeout();

View File

@ -474,7 +474,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
}
[Fact]
public async Task LongPollingTransportRePollsIfRequestCancelled()
public async Task LongPollingTransportRePollsIfRequestCanceled()
{
var numPolls = 0;
var completionTcs = new TaskCompletionSource<object>();

View File

@ -57,7 +57,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
else if (result.IsCanceled)
{
// This is useful for detecting that the connection tried to gracefully terminate.
// If the Receiving task is faulted/cancelled, it means StopAsync was the thing that
// If the Receiving task is faulted/canceled, it means StopAsync was the thing that
// actually terminated the connection (not ideal, we want the transport pipe to
// shut down gracefully)
throw new OperationCanceledException();

View File

@ -9,6 +9,7 @@ using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Xunit;
@ -91,11 +92,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
private static HubConnection CreateConnection(string url, HttpTransportType transportType, IHubProtocol protocol, ILoggerFactory loggerFactory)
{
return new HubConnectionBuilder()
.WithHubProtocol(protocol)
var hubConnectionBuilder = new HubConnectionBuilder()
.WithLoggerFactory(loggerFactory)
.WithUrl(url, transportType)
.Build();
.WithUrl(url, transportType);
hubConnectionBuilder.Services.AddSingleton(protocol);
return hubConnectionBuilder.Build();
}
private static IEnumerable<HttpTransportType> TransportTypes()

View File

@ -0,0 +1,32 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Client;
namespace Microsoft.AspNetCore.SignalR.Tests
{
public class DelegateConnectionFactory : IConnectionFactory
{
private readonly Func<TransferFormat, Task<ConnectionContext>> _connectDelegate;
private readonly Func<ConnectionContext, Task> _disposeDelegate;
public DelegateConnectionFactory(Func<TransferFormat, Task<ConnectionContext>> connectDelegate, Func<ConnectionContext, Task> disposeDelegate)
{
_connectDelegate = connectDelegate;
_disposeDelegate = disposeDelegate;
}
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
{
return _connectDelegate(transferFormat);
}
public Task DisposeAsync(ConnectionContext connection)
{
return _disposeDelegate(connection);
}
}
}

View File

@ -430,6 +430,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
closeTcs.SetResult(null);
}
return Task.CompletedTask;
};
logger.LogInformation("Starting connection to {url}", url);