CancellationToken EVERYWHERE (#2101)

This commit is contained in:
Andrew Stanton-Nurse 2018-04-20 16:21:51 -07:00 committed by GitHub
parent 058830c9df
commit 47eafca4d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 205 additions and 175 deletions

View File

@ -6,6 +6,7 @@ using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
@ -82,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
}
public override ValueTask WriteAsync(HubMessage message)
public override ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken)
{
if (message is CompletionMessage completionMessage)
{

View File

@ -1,5 +1,6 @@
using System;
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using ClientSample;
using Microsoft.AspNetCore.Connections;
@ -46,7 +47,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
_endPoint = endPoint;
}
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
return new TcpConnection(_endPoint).StartAsync();
}

View File

@ -121,12 +121,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
_transportFactory = transportFactory;
}
public async Task StartAsync()
public async Task StartAsync(CancellationToken cancellationToken = default)
{
await StartAsync(TransferFormat.Binary);
await StartAsync(TransferFormat.Binary, cancellationToken);
}
public async Task StartAsync(TransferFormat transferFormat)
public async Task StartAsync(TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
await StartAsyncCore(transferFormat).ForceAsync();
}

View File

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.Http.Connections.Internal;
using Microsoft.Extensions.DependencyInjection.Extensions;

View File

@ -4,7 +4,6 @@
using System;
using System.Reflection;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Internal;
using Microsoft.AspNetCore.Routing;

View File

@ -3,7 +3,6 @@
using System.Collections.Generic;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http.Connections.Internal;
namespace Microsoft.AspNetCore.Http.Connections
{

View File

@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.AspNetCore.Http.Connections
{

View File

@ -14,9 +14,7 @@ using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Client.Internal;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -78,6 +76,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
await StopAsyncCore(disposing: false).ForceAsync();
}
// Current plan for IAsyncDisposable is that DisposeAsync will NOT take a CancellationToken
// https://github.com/dotnet/csharplang/blob/195efa07806284d7b57550e7447dc8bd39c156bf/proposals/async-streams.md#iasyncdisposable
public async Task DisposeAsync()
{
if (!_disposed)
@ -211,7 +211,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// Now stop the connection we captured
if (connectionState != null)
{
await connectionState.StopAsync(ServerTimeout);
await connectionState.StopAsync();
}
}
@ -970,7 +970,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
}
public Task StopAsync(TimeSpan timeout)
public Task StopAsync()
{
// We want multiple StopAsync calls on the same connection state
// to wait for the same "stop" to complete.
@ -983,12 +983,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
else
{
_stopTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
return StopAsyncCore(timeout);
return StopAsyncCore();
}
}
}
private async Task StopAsyncCore(TimeSpan timeout)
private async Task StopAsyncCore()
{
Log.Stopping(_hubConnection._logger);

View File

@ -3,14 +3,7 @@
using System;
using System.ComponentModel;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Client
{

View File

@ -1,11 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Client
{

View File

@ -1,11 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Client
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;

View File

@ -2,9 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.SignalR.Client
{

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
@ -8,8 +9,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public interface IConnectionFactory
{
Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat);
Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat, CancellationToken cancellationToken = default);
// Current plan for IAsyncDisposable is that DisposeAsync will NOT take a CancellationToken
// https://github.com/dotnet/csharplang/blob/195efa07806284d7b57550e7447dc8bd39c156bf/proposals/async-streams.md#iasyncdisposable
Task DisposeAsync(ConnectionContext connection);
}
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Client;
@ -31,10 +32,10 @@ namespace Microsoft.AspNetCore.SignalR.Client
_loggerFactory = loggerFactory;
}
public async Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
public async Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat, CancellationToken cancellationToken = default)
{
var connection = new HttpConnection(_httpConnectionOptions, _loggerFactory);
await connection.StartAsync(transferFormat);
await connection.StartAsync(transferFormat, cancellationToken);
return connection;
}
@ -43,4 +44,4 @@ namespace Microsoft.AspNetCore.SignalR.Client
return ((HttpConnection)connection).DisposeAsync();
}
}
}
}

View File

@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Text;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR
@ -13,10 +15,11 @@ namespace Microsoft.AspNetCore.SignalR
/// </summary>
/// <param name="clientProxy">The <see cref="IClientProxy"/></param>
/// <param name="method">name of the method to invoke</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method)
public static Task SendAsync(this IClientProxy clientProxy, string method, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, Array.Empty<object>());
return clientProxy.SendCoreAsync(method, Array.Empty<object>(), cancellationToken);
}
/// <summary>
@ -26,10 +29,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="clientProxy">The <see cref="IClientProxy"/></param>
/// <param name="method">name of the method to invoke</param>
/// <param name="arg1">The first argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1 });
return clientProxy.SendCoreAsync(method, new[] { arg1 }, cancellationToken);
}
/// <summary>
@ -40,10 +44,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="method">name of the method to invoke</param>
/// <param name="arg1">The first argument</param>
/// <param name="arg2">The second argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2 }, cancellationToken);
}
/// <summary>
@ -55,10 +60,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg1">The first argument</param>
/// <param name="arg2">The second argument</param>
/// <param name="arg3">The third argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3 }, cancellationToken);
}
/// <summary>
@ -71,10 +77,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg2">The second argument</param>
/// <param name="arg3">The third argument</param>
/// <param name="arg4">The fourth argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4 }, cancellationToken);
}
/// <summary>
@ -88,10 +95,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg3">The third argument</param>
/// <param name="arg4">The fourth argument</param>
/// <param name="arg5">The fifth argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
}
/// <summary>
@ -106,10 +114,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg4">The fourth argument</param>
/// <param name="arg5">The fifth argument</param>
/// <param name="arg6">The sixth argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
}
/// <summary>
@ -125,10 +134,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg5">The fifth argument</param>
/// <param name="arg6">The sixth argument</param>
/// <param name="arg7">The seventh argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
}
/// <summary>
@ -145,10 +155,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg6">The sixth argument</param>
/// <param name="arg7">The seventh argument</param>
/// <param name="arg8">The eigth argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
}
/// <summary>
@ -166,10 +177,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg7">The seventh argument</param>
/// <param name="arg8">The eigth argument</param>
/// <param name="arg9">The ninth argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
}
/// <summary>
@ -188,10 +200,11 @@ namespace Microsoft.AspNetCore.SignalR
/// <param name="arg8">The eigth argument</param>
/// <param name="arg9">The ninth argument</param>
/// <param name="arg10">The tenth argument</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10)
public static Task SendAsync(this IClientProxy clientProxy, string method, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 });
return clientProxy.SendCoreAsync(method, new[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
}
}
}

View File

@ -5,6 +5,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
@ -23,7 +24,7 @@ namespace Microsoft.AspNetCore.SignalR
_logger = logger;
}
public override Task AddToGroupAsync(string connectionId, string groupName)
public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
@ -46,7 +47,7 @@ namespace Microsoft.AspNetCore.SignalR
return Task.CompletedTask;
}
public override Task RemoveFromGroupAsync(string connectionId, string groupName)
public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
@ -69,7 +70,7 @@ namespace Microsoft.AspNetCore.SignalR
return Task.CompletedTask;
}
public override Task SendAllAsync(string methodName, object[] args)
public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, null);
}
@ -145,7 +146,7 @@ namespace Microsoft.AspNetCore.SignalR
}
}
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args)
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
@ -166,7 +167,7 @@ namespace Microsoft.AspNetCore.SignalR
return connection.WriteAsync(message).AsTask();
}
public override Task SendGroupAsync(string groupName, string methodName, object[] args)
public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
@ -191,7 +192,7 @@ namespace Microsoft.AspNetCore.SignalR
return Task.CompletedTask;
}
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args)
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
{
// Each task represents the list of tasks for each of the writes within a group
List<Task> tasks = null;
@ -219,7 +220,7 @@ namespace Microsoft.AspNetCore.SignalR
return Task.CompletedTask;
}
public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
@ -253,7 +254,7 @@ namespace Microsoft.AspNetCore.SignalR
return new InvocationMessage(methodName, args);
}
public override Task SendUserAsync(string userId, string methodName, object[] args)
public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => string.Equals(connection.UserIdentifier, userId, StringComparison.Ordinal));
}
@ -271,17 +272,17 @@ namespace Microsoft.AspNetCore.SignalR
return Task.CompletedTask;
}
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => !excludedConnectionIds.Contains(connection.ConnectionId));
}
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args)
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => connectionIds.Contains(connection.ConnectionId));
}
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args)
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
return SendToAllConnections(methodName, args, connection => userIds.Contains(connection.UserIdentifier));
}

View File

@ -1,8 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Security.Claims;
using System.Threading;

View File

@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR
// Currently used only for streaming methods
internal ConcurrentDictionary<string, CancellationTokenSource> ActiveRequestCancellationSources { get; } = new ConcurrentDictionary<string, CancellationTokenSource>(StringComparer.Ordinal);
public virtual ValueTask WriteAsync(HubMessage message)
public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancellationToken = default)
{
// Try to grab the lock synchronously, if we fail, go to the slower path
if (!_writeLock.Wait(0))
@ -92,8 +92,9 @@ namespace Microsoft.AspNetCore.SignalR
/// connection.
/// </summary>
/// <param name="message">The serialization cache to use.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns></returns>
public virtual ValueTask WriteAsync(SerializedHubMessage message)
public virtual ValueTask WriteAsync(SerializedHubMessage message, CancellationToken cancellationToken = default)
{
// Try to grab the lock synchronously, if we fail, go to the slower path
if (!_writeLock.Wait(0))

View File

@ -2,37 +2,40 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR
{
public abstract class HubLifetimeManager<THub> where THub : Hub
{
// Called by the framework and not something we'd cancel, so it doesn't take a cancellation token
public abstract Task OnConnectedAsync(HubConnectionContext connection);
// Called by the framework and not something we'd cancel, so it doesn't take a cancellation token
public abstract Task OnDisconnectedAsync(HubConnectionContext connection);
public abstract Task SendAllAsync(string methodName, object[] args);
public abstract Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds);
public abstract Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default);
public abstract Task SendConnectionAsync(string connectionId, string methodName, object[] args);
public abstract Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args);
public abstract Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task SendGroupAsync(string groupName, string methodName, object[] args);
public abstract Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args);
public abstract Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds);
public abstract Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default);
public abstract Task SendUserAsync(string userId, string methodName, object[] args);
public abstract Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args);
public abstract Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default);
public abstract Task AddToGroupAsync(string connectionId, string groupName);
public abstract Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default);
public abstract Task RemoveFromGroupAsync(string connectionId, string groupName);
public abstract Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default);
}
}

View File

@ -2,8 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.AspNetCore.SignalR
{

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR
@ -17,7 +18,8 @@ namespace Microsoft.AspNetCore.SignalR
/// </summary>
/// <param name="method">Name of the method to invoke.</param>
/// <param name="args">A collection of arguments to pass to the client.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>A task that represents when the data has been sent to the client.</returns>
Task SendCoreAsync(string method, object[] args);
Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default);
}
}

View File

@ -1,13 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR
{
public interface IGroupManager
{
Task AddToGroupAsync(string connectionId, string groupName);
Task RemoveFromGroupAsync(string connectionId, string groupName);
Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default);
Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default);
}
}

View File

@ -0,0 +1,28 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Internal
{
internal class GroupManager<THub> : IGroupManager where THub : Hub
{
private readonly HubLifetimeManager<THub> _lifetimeManager;
public GroupManager(HubLifetimeManager<THub> lifetimeManager)
{
_lifetimeManager = lifetimeManager;
}
public Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
return _lifetimeManager.AddToGroupAsync(connectionId, groupName, cancellationToken);
}
public Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
return _lifetimeManager.RemoveFromGroupAsync(connectionId, groupName, cancellationToken);
}
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Internal
@ -17,9 +18,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_userId = userId;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendUserAsync(_userId, method, args);
return _lifetimeManager.SendUserAsync(_userId, method, args, cancellationToken);
}
}
@ -34,9 +35,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_userIds = userIds;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendUsersAsync(_userIds, method, args);
return _lifetimeManager.SendUsersAsync(_userIds, method, args, cancellationToken);
}
}
@ -51,9 +52,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_groupName = groupName;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendGroupAsync(_groupName, method, args);
return _lifetimeManager.SendGroupAsync(_groupName, method, args, cancellationToken);
}
}
@ -68,9 +69,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_groupNames = groupNames;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendGroupsAsync(_groupNames, method, args);
return _lifetimeManager.SendGroupsAsync(_groupNames, method, args, cancellationToken);
}
}
@ -87,9 +88,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_excludedConnectionIds = excludedConnectionIds;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendGroupExceptAsync(_groupName, method, args, _excludedConnectionIds);
return _lifetimeManager.SendGroupExceptAsync(_groupName, method, args, _excludedConnectionIds, cancellationToken);
}
}
@ -102,7 +103,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_lifetimeManager = lifetimeManager;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendAllAsync(method, args);
}
@ -119,9 +120,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_excludedConnectionIds = excludedConnectionIds;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendAllExceptAsync(method, args, _excludedConnectionIds);
return _lifetimeManager.SendAllExceptAsync(method, args, _excludedConnectionIds, cancellationToken);
}
}
@ -136,9 +137,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_connectionId = connectionId;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendConnectionAsync(_connectionId, method, args);
return _lifetimeManager.SendConnectionAsync(_connectionId, method, args, cancellationToken);
}
}
@ -153,29 +154,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_connectionIds = connectionIds;
}
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken = default)
{
return _lifetimeManager.SendConnectionsAsync(_connectionIds, method, args);
}
}
internal class GroupManager<THub> : IGroupManager where THub : Hub
{
private readonly HubLifetimeManager<THub> _lifetimeManager;
public GroupManager(HubLifetimeManager<THub> lifetimeManager)
{
_lifetimeManager = lifetimeManager;
}
public Task AddToGroupAsync(string connectionId, string groupName)
{
return _lifetimeManager.AddToGroupAsync(connectionId, groupName);
}
public Task RemoveFromGroupAsync(string connectionId, string groupName)
{
return _lifetimeManager.RemoveFromGroupAsync(connectionId, groupName);
return _lifetimeManager.SendConnectionsAsync(_connectionIds, method, args, cancellationToken);
}
}
}

View File

@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Reflection.Emit;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Internal
@ -17,6 +18,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal
// There is one static instance of _builder per T
private static readonly Lazy<Func<IClientProxy, T>> _builder = new Lazy<Func<IClientProxy, T>>(() => GenerateClientBuilder());
private static readonly PropertyInfo CancellationTokenNoneProperty = typeof(CancellationToken).GetProperty("None", BindingFlags.Public | BindingFlags.Static);
public static T Build(IClientProxy proxy)
{
return _builder.Value(proxy);
@ -115,7 +118,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
var invokeMethod = typeof(IClientProxy).GetMethod(
nameof(IClientProxy.SendCoreAsync), BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, null,
new[] { typeof(string), typeof(object[]) }, null);
new[] { typeof(string), typeof(object[]), typeof(CancellationToken) }, null);
methodBuilder.SetReturnType(interfaceMethodInfo.ReturnType);
methodBuilder.SetParameters(paramTypes);
@ -156,8 +159,13 @@ namespace Microsoft.AspNetCore.SignalR.Internal
generator.Emit(OpCodes.Stelem_Ref);
}
// Call SendCoreAsync
// Load parameter array on to the stack.
generator.Emit(OpCodes.Ldloc_0);
// Get 'CancellationToken.None' and put it on the stack, since we don't support CancellationToken right now
generator.Emit(OpCodes.Call, CancellationTokenNoneProperty.GetMethod);
// Send!
generator.Emit(OpCodes.Callvirt, invokeMethod);
generator.Emit(OpCodes.Ret); // Return the Task returned by 'invokeMethod'

View File

@ -9,7 +9,6 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Redis.Internal;
using Microsoft.Extensions.Logging;
@ -106,19 +105,19 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return Task.WhenAll(tasks);
}
public override Task SendAllAsync(string methodName, object[] args)
public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.All, message);
}
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.All, message);
}
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args)
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
@ -137,7 +136,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return PublishAsync(_channels.Connection(connectionId), message);
}
public override Task SendGroupAsync(string groupName, string methodName, object[] args)
public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
@ -148,7 +147,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return PublishAsync(_channels.Group(groupName), message);
}
public override async Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
public override async Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
@ -159,13 +158,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis
await PublishAsync(_channels.Group(groupName), message);
}
public override Task SendUserAsync(string userId, string methodName, object[] args)
public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.User(userId), message);
}
public override async Task AddToGroupAsync(string connectionId, string groupName)
public override async Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
@ -188,7 +187,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
await SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add);
}
public override async Task RemoveFromGroupAsync(string connectionId, string groupName)
public override async Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
@ -211,7 +210,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
await SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove);
}
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args)
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionIds == null)
{
@ -229,7 +228,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return Task.WhenAll(publishTasks);
}
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args)
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupNames == null)
{
@ -249,7 +248,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return Task.WhenAll(publishTasks);
}
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args)
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (userIds.Count > 0)
{

View File

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
@ -62,7 +63,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
await manager2.OnConnectedAsync(connection2).OrTimeout();
await manager3.OnConnectedAsync(connection3).OrTimeout();
await manager1.SendAllExceptAsync("Hello", new object[] { "World" }, new [] { client3.Connection.ConnectionId }).OrTimeout();
await manager1.SendAllExceptAsync("Hello", new object[] { "World" }, new[] { client3.Connection.ConnectionId }).OrTimeout();
await AssertMessageAsync(client1);
await AssertMessageAsync(client2);
@ -455,7 +456,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
// Force an exception when writing to connection
var connectionMock = HubConnectionContextUtils.CreateMock(client.Connection);
connectionMock.Setup(m => m.WriteAsync(It.IsAny<HubMessage>())).Throws(new Exception());
connectionMock.Setup(m => m.WriteAsync(It.IsAny<HubMessage>(), It.IsAny<CancellationToken>())).Throws(new Exception());
var connection = connectionMock.Object;
await manager2.OnConnectedAsync(connection).OrTimeout();
@ -478,7 +479,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
// Force an exception when writing to connection
var connectionMock = HubConnectionContextUtils.CreateMock(client1.Connection);
connectionMock.Setup(m => m.WriteAsync(It.IsAny<HubMessage>())).Throws(new Exception());
connectionMock.Setup(m => m.WriteAsync(It.IsAny<HubMessage>(), It.IsAny<CancellationToken>())).Throws(new Exception());
var connection1 = connectionMock.Object;
var connection2 = HubConnectionContextUtils.Create(client2.Connection);

View File

@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Client;
@ -13,13 +14,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests
private readonly Func<TransferFormat, Task<ConnectionContext>> _connectDelegate;
private readonly Func<ConnectionContext, Task> _disposeDelegate;
// We have no tests that use the CancellationToken. When we do, we can add it to the delegate. This is test code.
public DelegateConnectionFactory(Func<TransferFormat, Task<ConnectionContext>> connectDelegate, Func<ConnectionContext, Task> disposeDelegate)
{
_connectDelegate = connectDelegate;
_disposeDelegate = disposeDelegate;
}
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat)
public Task<ConnectionContext> ConnectAsync(TransferFormat transferFormat, CancellationToken cancellationToken)
{
return _connectDelegate(transferFormat);
}

View File

@ -1,8 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Moq;
@ -22,8 +23,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendUserAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<string, string, object[]>((userId, methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendUserAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<string, string, object[], CancellationToken>((userId, methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new UserProxy<FakeHub>(o.Object, string.Empty);
@ -43,8 +44,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendUsersAsync(It.IsAny<IReadOnlyList<string>>(), It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<IReadOnlyList<string>, string, object[]>((userIds, methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendUsersAsync(It.IsAny<IReadOnlyList<string>>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<IReadOnlyList<string>, string, object[], CancellationToken>((userIds, methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new MultipleUserProxy<FakeHub>(o.Object, new List<string>());
@ -64,8 +65,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendGroupAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<string, string, object[]>((groupName, methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendGroupAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<string, string, object[], CancellationToken>((groupName, methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new GroupProxy<FakeHub>(o.Object, string.Empty);
@ -85,8 +86,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendGroupsAsync(It.IsAny<IReadOnlyList<string>>(), It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<IReadOnlyList<string>, string, object[]>((groupNames, methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendGroupsAsync(It.IsAny<IReadOnlyList<string>>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<IReadOnlyList<string>, string, object[], CancellationToken>((groupNames, methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new MultipleGroupProxy<FakeHub>(o.Object, new List<string>());
@ -106,8 +107,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendGroupExceptAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<IReadOnlyList<string>>()))
.Callback<string, string, object[], IReadOnlyList<string>>((groupName, methodName, args, excludedConnectionIds) => { resultArgs = args; })
o.Setup(m => m.SendGroupExceptAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<IReadOnlyList<string>>(), It.IsAny<CancellationToken>()))
.Callback<string, string, object[], IReadOnlyList<string>, CancellationToken>((groupName, methodName, args, excludedConnectionIds, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new GroupExceptProxy<FakeHub>(o.Object, string.Empty, new List<string>());
@ -127,8 +128,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendAllAsync(It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<string, object[]>((methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendAllAsync(It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<string, object[], CancellationToken>((methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new AllClientProxy<FakeHub>(o.Object);
@ -148,8 +149,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendAllExceptAsync(It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<IReadOnlyList<string>>()))
.Callback<string, object[], IReadOnlyList<string>>((methodName, args, excludedConnectionIds) => { resultArgs = args; })
o.Setup(m => m.SendAllExceptAsync(It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<IReadOnlyList<string>>(), It.IsAny<CancellationToken>()))
.Callback<string, object[], IReadOnlyList<string>, CancellationToken>((methodName, args, excludedConnectionIds, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new AllClientsExceptProxy<FakeHub>(o.Object, new List<string>());
@ -169,8 +170,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendConnectionAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<string, string, object[]>((connectionId, methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendConnectionAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<string, string, object[], CancellationToken>((connectionId, methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new SingleClientProxy<FakeHub>(o.Object, string.Empty);
@ -190,8 +191,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
object[] resultArgs = null;
var o = new Mock<HubLifetimeManager<FakeHub>>();
o.Setup(m => m.SendConnectionsAsync(It.IsAny<IReadOnlyList<string>>(), It.IsAny<string>(), It.IsAny<object[]>()))
.Callback<IReadOnlyList<string>, string, object[]>((connectionIds, methodName, args) => { resultArgs = args; })
o.Setup(m => m.SendConnectionsAsync(It.IsAny<IReadOnlyList<string>>(), It.IsAny<string>(), It.IsAny<object[]>(), It.IsAny<CancellationToken>()))
.Callback<IReadOnlyList<string>, string, object[], CancellationToken>((connectionIds, methodName, args, _) => { resultArgs = args; })
.Returns(Task.CompletedTask);
var proxy = new MultipleClientProxy<FakeHub>(o.Object, new List<string>());
@ -205,4 +206,4 @@ namespace Microsoft.AspNetCore.SignalR.Tests
Assert.Same(data, arg);
}
}
}
}

View File

@ -30,6 +30,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Internal
Assert.Equal("Method", send.Method);
Assert.Equal("foo", send.Arguments[0]);
Assert.Equal(42, send.Arguments[1]);
Assert.Equal(CancellationToken.None, send.CancellationToken);
Assert.Same(objArg, send.Arguments[2]);
send.Complete();
});
@ -58,6 +59,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Internal
arg1 => Assert.Equal("foo", arg1),
arg2 => Assert.Equal(42, arg2),
arg3 => Assert.Same(objArg, arg3));
Assert.Equal(CancellationToken.None, send1.CancellationToken);
send1.Complete();
},
send2 =>
@ -65,6 +67,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Internal
Assert.Equal("SubMethod", send2.Method);
Assert.Collection(send2.Arguments,
arg1 => Assert.Equal("bar", arg1));
Assert.Equal(CancellationToken.None, send2.CancellationToken);
send2.Complete();
});
@ -190,11 +193,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Internal
{
public IList<SendContext> Sends { get; } = new List<SendContext>();
public Task SendCoreAsync(string method, object[] args)
public Task SendCoreAsync(string method, object[] args, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object>();
Sends.Add(new SendContext(method, args, tcs));
Sends.Add(new SendContext(method, args, cancellationToken, tcs));
return tcs.Task;
}
@ -206,11 +209,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests.Internal
public string Method { get; }
public object[] Arguments { get; }
public CancellationToken CancellationToken { get; }
public SendContext(string method, object[] arguments, TaskCompletionSource<object> tcs) : this()
public SendContext(string method, object[] arguments, CancellationToken cancellationToken, TaskCompletionSource<object> tcs) : this()
{
Method = method;
Arguments = arguments;
CancellationToken = cancellationToken;
_tcs = tcs;
}