Invoke Multiple Connections (#1242)

This commit is contained in:
Mikael Mengistu 2017-12-21 15:19:43 -08:00 committed by GitHub
parent 69dc7a4b23
commit 738617266e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 148 additions and 2 deletions

View File

@ -152,6 +152,11 @@ namespace ChatSample
return _wrappedHubLifetimeManager.InvokeConnectionAsync(connectionId, methodName, args);
}
public override Task InvokeConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args)
{
return _wrappedHubLifetimeManager.InvokeConnectionsAsync(connectionIds, methodName, args);
}
public override Task InvokeGroupAsync(string groupName, string methodName, object[] args)
{
return _wrappedHubLifetimeManager.InvokeGroupAsync(groupName, methodName, args);

View File

@ -177,5 +177,13 @@ namespace Microsoft.AspNetCore.SignalR
return !excludedIds.Contains(connection.ConnectionId);
});
}
public override Task InvokeConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args)
{
return InvokeAllWhere(methodName, args, connection =>
{
return connectionIds.Contains(connection.ConnectionId);
});
}
}
}

View File

@ -18,6 +18,7 @@ namespace Microsoft.AspNetCore.SignalR
public dynamic AllExcept(IReadOnlyList<string> excludedIds) => new DynamicClientProxy(_clients.AllExcept(excludedIds));
public dynamic Caller => new DynamicClientProxy(_clients.Caller);
public dynamic Client(string connectionId) => new DynamicClientProxy(_clients.Client(connectionId));
public dynamic MultipleClients(IReadOnlyList<string> connectionIds) => new DynamicClientProxy(_clients.MultipleClients(connectionIds));
public dynamic Group(string groupName) => new DynamicClientProxy(_clients.Group(groupName));
public dynamic GroupExcept(string groupName, IReadOnlyList<string> excludedIds) => new DynamicClientProxy(_clients.GroupExcept(groupName, excludedIds));
public dynamic OthersInGroup(string groupName) => new DynamicClientProxy(_clients.OthersInGroup(groupName));

View File

@ -53,5 +53,10 @@ namespace Microsoft.AspNetCore.SignalR
{
return _hubClients.User(userId);
}
public IClientProxy MultipleClients(IReadOnlyList<string> connectionIds)
{
return _hubClients.MultipleClients(connectionIds);
}
}
}

View File

@ -46,5 +46,10 @@ namespace Microsoft.AspNetCore.SignalR
{
return new UserProxy<THub>(_lifetimeManager, userId);
}
public virtual IClientProxy MultipleClients(IReadOnlyList<string> connectionIds)
{
return new MultipleClientProxy<THub>(_lifetimeManager, connectionIds);
}
}
}

View File

@ -34,6 +34,11 @@ namespace Microsoft.AspNetCore.SignalR
return TypedClientBuilder<T>.Build(new SingleClientProxy<THub>(_lifetimeManager, connectionId));
}
public T MultipleClients(IReadOnlyList<string> connectionIds)
{
return TypedClientBuilder<T>.Build(new MultipleClientProxy<THub>(_lifetimeManager, connectionIds));
}
public virtual T Group(string groupName)
{
return TypedClientBuilder<T>.Build(new GroupProxy<THub>(_lifetimeManager, groupName));

View File

@ -18,6 +18,8 @@ namespace Microsoft.AspNetCore.SignalR
public abstract Task InvokeConnectionAsync(string connectionId, string methodName, object[] args);
public abstract Task InvokeConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args);
public abstract Task InvokeGroupAsync(string groupName, string methodName, object[] args);
public abstract Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds);

View File

@ -13,6 +13,8 @@ namespace Microsoft.AspNetCore.SignalR
T Client(string connectionId);
T MultipleClients(IReadOnlyList<string> connectionIds);
T Group(string groupName);
T GroupExcept(string groupName, IReadOnlyList<string> excludeIds);

View File

@ -108,6 +108,23 @@ namespace Microsoft.AspNetCore.SignalR
}
}
public class MultipleClientProxy<THub> : IClientProxy
{
private readonly HubLifetimeManager<THub> _lifetimeManager;
private IReadOnlyList<string> _connectionIds;
public MultipleClientProxy(HubLifetimeManager<THub> lifetimeManager, IReadOnlyList<string> connectionIds)
{
_lifetimeManager = lifetimeManager;
_connectionIds = connectionIds;
}
public Task InvokeAsync(string method, params object[] args)
{
return _lifetimeManager.InvokeConnectionsAsync(_connectionIds, method, args);
}
}
public class GroupManager<THub> : IGroupManager
{
private readonly HubLifetimeManager<THub> _lifetimeManager;

View File

@ -37,6 +37,11 @@ namespace Microsoft.AspNetCore.SignalR
return TypedClientBuilder<T>.Build(_hubClients.GroupExcept(groupName, excludeIds));
}
public T MultipleClients(IReadOnlyList<string> connectionIds)
{
return TypedClientBuilder<T>.Build(_hubClients.MultipleClients(connectionIds));
}
public T OthersInGroup(string groupName)
{
return TypedClientBuilder<T>.Build(_hubClients.OthersInGroup(groupName));

View File

@ -574,6 +574,33 @@ namespace Microsoft.AspNetCore.SignalR.Redis
});
}
public override Task InvokeConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args)
{
if (connectionIds == null)
{
throw new ArgumentNullException(nameof(connectionIds));
}
var publishTasks = new List<Task>(connectionIds.Count);
var message = new RedisInvocationMessage(target: methodName, arguments: args);
foreach(string connectionId in connectionIds)
{
var connection = _connections[connectionId];
// If the connection is local we can skip sending the message through the bus since we require sticky connections.
// This also saves serializing and deserializing the message!
if (connection != null)
{
publishTasks.Add(connection.WriteAsync(message.CreateInvocation()));
}
else
{
publishTasks.Add(PublishAsync(_channelNamePrefix + "." + connectionId, message));
}
}
return Task.WhenAll(publishTasks);
}
private class LoggerTextWriter : TextWriter
{
private readonly ILogger _logger;
@ -645,7 +672,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
}
public RedisInvocationMessage(string target, object[] arguments)
: this(target, excludedIds: null, arguments)
: this(target, excludedIds: null, arguments: arguments)
{
}
@ -658,7 +685,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
public InvocationMessage CreateInvocation()
{
return new InvocationMessage(Target, argumentBindingException: null, Arguments);
return new InvocationMessage(Target, argumentBindingException: null, arguments: Arguments);
}
}
}

View File

@ -27,6 +27,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils
return Clients.Client(connectionId).InvokeAsync("Send", message);
}
public Task SendToMultipleClients(string message, IReadOnlyList<string> connectionIds)
{
return Clients.MultipleClients(connectionIds).InvokeAsync("Send", message);
}
public Task GroupAddMethod(string groupName)
{
return Groups.AddAsync(Context.ConnectionId, groupName);
@ -176,6 +181,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils
return Clients.Client(connectionId).Send(message);
}
public Task SendToMultipleClients(string message, IReadOnlyList<string> connectionIds)
{
return Clients.MultipleClients(connectionIds).Send(message);
}
public Task GroupAddMethod(string groupName)
{
return Groups.AddAsync(Context.ConnectionId, groupName);
@ -240,11 +250,18 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils
{
return Clients.Client(connectionId).Send(message);
}
public Task SendToMultipleClients(string message, IReadOnlyList<string> connectionIds)
{
return Clients.MultipleClients(connectionIds).Send(message);
}
public async Task DelayedSend(string connectionId, string message)
{
await Task.Delay(100);
await Clients.Client(connectionId).Send(message);
}
public Task GroupAddMethod(string groupName)
{
return Groups.AddAsync(Context.ConnectionId, groupName);

View File

@ -937,6 +937,53 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Theory]
[MemberData(nameof(HubTypes))]
public async Task SendToMultipleClients(Type hubType)
{
dynamic endPoint = HubEndPointTestUtils.GetHubEndpoint(hubType);
using (var firstClient = new TestClient())
using (var secondClient = new TestClient())
using (var thirdClient = new TestClient())
{
Task firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
Task secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);
Task thirdEndPointTask = endPoint.OnConnectedAsync(thirdClient.Connection);
await Task.WhenAll(firstClient.Connected, secondClient.Connected, thirdClient.Connected).OrTimeout();
var secondAndThirdClients = new HashSet<string> {secondClient.Connection.ConnectionId,
thirdClient.Connection.ConnectionId };
secondAndThirdClients.Add(secondClient.Connection.ConnectionId);
await firstClient.SendInvocationAsync("SendToMultipleClients", "Second and Third", secondAndThirdClients).OrTimeout();
var secondClientResult = await secondClient.ReadAsync().OrTimeout();
var invocation = Assert.IsType<InvocationMessage>(secondClientResult);
Assert.Equal("Send", invocation.Target);
Assert.Equal("Second and Third", invocation.Arguments[0]);
var thirdClientResult = await thirdClient.ReadAsync().OrTimeout();
invocation = Assert.IsType<InvocationMessage>(thirdClientResult);
Assert.Equal("Send", invocation.Target);
Assert.Equal("Second and Third", invocation.Arguments[0]);
// Check that first client only got the completion message
var hubMessage= await firstClient.ReadAsync().OrTimeout();
Assert.IsType<CompletionMessage>(hubMessage);
Assert.Null(firstClient.TryRead());
// kill the connections
firstClient.Dispose();
secondClient.Dispose();
thirdClient.Dispose();
await Task.WhenAll(firstEndPointTask, secondEndPointTask, thirdEndPointTask).OrTimeout();
}
}
[Theory]
[MemberData(nameof(HubTypes))]
public async Task HubsCanAddAndSendToGroup(Type hubType)