From 738617266e88717f6cc5e0e857ce49639c0b4065 Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Thu, 21 Dec 2017 15:19:43 -0800 Subject: [PATCH] Invoke Multiple Connections (#1242) --- .../ChatSample/PresenceHubLifetimeManager.cs | 5 ++ .../DefaultHubLifetimeManager.cs | 8 ++++ .../DynamicHubClients.cs | 1 + .../HubCallerClients.cs | 5 ++ .../HubContext.cs | 5 ++ .../HubContext`T.cs | 5 ++ .../HubLifetimeManager.cs | 2 + .../IHubClients`T.cs | 2 + .../Proxies.cs | 17 +++++++ .../TypedHubClients.cs | 5 ++ .../RedisHubLifetimeManager.cs | 31 +++++++++++- .../HubEndpointTestUtils/Hubs.cs | 17 +++++++ .../HubEndpointTests.cs | 47 +++++++++++++++++++ 13 files changed, 148 insertions(+), 2 deletions(-) diff --git a/samples/ChatSample/PresenceHubLifetimeManager.cs b/samples/ChatSample/PresenceHubLifetimeManager.cs index cfca0a5590..e323e289a1 100644 --- a/samples/ChatSample/PresenceHubLifetimeManager.cs +++ b/samples/ChatSample/PresenceHubLifetimeManager.cs @@ -152,6 +152,11 @@ namespace ChatSample return _wrappedHubLifetimeManager.InvokeConnectionAsync(connectionId, methodName, args); } + public override Task InvokeConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args) + { + return _wrappedHubLifetimeManager.InvokeConnectionsAsync(connectionIds, methodName, args); + } + public override Task InvokeGroupAsync(string groupName, string methodName, object[] args) { return _wrappedHubLifetimeManager.InvokeGroupAsync(groupName, methodName, args); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs index 6f028038fb..ae264cd16d 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs @@ -177,5 +177,13 @@ namespace Microsoft.AspNetCore.SignalR return !excludedIds.Contains(connection.ConnectionId); }); } + + public override Task InvokeConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args) + { + return InvokeAllWhere(methodName, args, connection => + { + return connectionIds.Contains(connection.ConnectionId); + }); + } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs index 604a851f80..9ac810a26d 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs @@ -18,6 +18,7 @@ namespace Microsoft.AspNetCore.SignalR public dynamic AllExcept(IReadOnlyList excludedIds) => new DynamicClientProxy(_clients.AllExcept(excludedIds)); public dynamic Caller => new DynamicClientProxy(_clients.Caller); public dynamic Client(string connectionId) => new DynamicClientProxy(_clients.Client(connectionId)); + public dynamic MultipleClients(IReadOnlyList connectionIds) => new DynamicClientProxy(_clients.MultipleClients(connectionIds)); public dynamic Group(string groupName) => new DynamicClientProxy(_clients.Group(groupName)); public dynamic GroupExcept(string groupName, IReadOnlyList excludedIds) => new DynamicClientProxy(_clients.GroupExcept(groupName, excludedIds)); public dynamic OthersInGroup(string groupName) => new DynamicClientProxy(_clients.OthersInGroup(groupName)); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs index 936c2e4d66..052233965a 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs @@ -53,5 +53,10 @@ namespace Microsoft.AspNetCore.SignalR { return _hubClients.User(userId); } + + public IClientProxy MultipleClients(IReadOnlyList connectionIds) + { + return _hubClients.MultipleClients(connectionIds); + } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubContext.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubContext.cs index de4a55c61a..56ebe6f254 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubContext.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubContext.cs @@ -46,5 +46,10 @@ namespace Microsoft.AspNetCore.SignalR { return new UserProxy(_lifetimeManager, userId); } + + public virtual IClientProxy MultipleClients(IReadOnlyList connectionIds) + { + return new MultipleClientProxy(_lifetimeManager, connectionIds); + } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubContext`T.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubContext`T.cs index ceba0bd81e..9d3c2131e8 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubContext`T.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubContext`T.cs @@ -34,6 +34,11 @@ namespace Microsoft.AspNetCore.SignalR return TypedClientBuilder.Build(new SingleClientProxy(_lifetimeManager, connectionId)); } + public T MultipleClients(IReadOnlyList connectionIds) + { + return TypedClientBuilder.Build(new MultipleClientProxy(_lifetimeManager, connectionIds)); + } + public virtual T Group(string groupName) { return TypedClientBuilder.Build(new GroupProxy(_lifetimeManager, groupName)); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs index 939209ef58..8b620a1d6f 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs @@ -18,6 +18,8 @@ namespace Microsoft.AspNetCore.SignalR public abstract Task InvokeConnectionAsync(string connectionId, string methodName, object[] args); + public abstract Task InvokeConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args); + public abstract Task InvokeGroupAsync(string groupName, string methodName, object[] args); public abstract Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedIds); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs b/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs index 87d6fc41a1..a9b8d57780 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs @@ -13,6 +13,8 @@ namespace Microsoft.AspNetCore.SignalR T Client(string connectionId); + T MultipleClients(IReadOnlyList connectionIds); + T Group(string groupName); T GroupExcept(string groupName, IReadOnlyList excludeIds); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs b/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs index cfa812c44b..533e40a91c 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs @@ -108,6 +108,23 @@ namespace Microsoft.AspNetCore.SignalR } } + public class MultipleClientProxy : IClientProxy + { + private readonly HubLifetimeManager _lifetimeManager; + private IReadOnlyList _connectionIds; + + public MultipleClientProxy(HubLifetimeManager lifetimeManager, IReadOnlyList connectionIds) + { + _lifetimeManager = lifetimeManager; + _connectionIds = connectionIds; + } + + public Task InvokeAsync(string method, params object[] args) + { + return _lifetimeManager.InvokeConnectionsAsync(_connectionIds, method, args); + } + } + public class GroupManager : IGroupManager { private readonly HubLifetimeManager _lifetimeManager; diff --git a/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs index ac68e41380..dd3ce0d51d 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs @@ -37,6 +37,11 @@ namespace Microsoft.AspNetCore.SignalR return TypedClientBuilder.Build(_hubClients.GroupExcept(groupName, excludeIds)); } + public T MultipleClients(IReadOnlyList connectionIds) + { + return TypedClientBuilder.Build(_hubClients.MultipleClients(connectionIds)); + } + public T OthersInGroup(string groupName) { return TypedClientBuilder.Build(_hubClients.OthersInGroup(groupName)); diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 6a4d11e040..0eb08287b6 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -574,6 +574,33 @@ namespace Microsoft.AspNetCore.SignalR.Redis }); } + public override Task InvokeConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args) + { + if (connectionIds == null) + { + throw new ArgumentNullException(nameof(connectionIds)); + } + var publishTasks = new List(connectionIds.Count); + var message = new RedisInvocationMessage(target: methodName, arguments: args); + + foreach(string connectionId in connectionIds) + { + var connection = _connections[connectionId]; + // If the connection is local we can skip sending the message through the bus since we require sticky connections. + // This also saves serializing and deserializing the message! + if (connection != null) + { + publishTasks.Add(connection.WriteAsync(message.CreateInvocation())); + } + else + { + publishTasks.Add(PublishAsync(_channelNamePrefix + "." + connectionId, message)); + } + } + + return Task.WhenAll(publishTasks); + } + private class LoggerTextWriter : TextWriter { private readonly ILogger _logger; @@ -645,7 +672,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis } public RedisInvocationMessage(string target, object[] arguments) - : this(target, excludedIds: null, arguments) + : this(target, excludedIds: null, arguments: arguments) { } @@ -658,7 +685,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis public InvocationMessage CreateInvocation() { - return new InvocationMessage(Target, argumentBindingException: null, Arguments); + return new InvocationMessage(Target, argumentBindingException: null, arguments: Arguments); } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs index 7222f1e7e3..7ab0df4d33 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs @@ -27,6 +27,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils return Clients.Client(connectionId).InvokeAsync("Send", message); } + public Task SendToMultipleClients(string message, IReadOnlyList connectionIds) + { + return Clients.MultipleClients(connectionIds).InvokeAsync("Send", message); + } + public Task GroupAddMethod(string groupName) { return Groups.AddAsync(Context.ConnectionId, groupName); @@ -176,6 +181,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils return Clients.Client(connectionId).Send(message); } + public Task SendToMultipleClients(string message, IReadOnlyList connectionIds) + { + return Clients.MultipleClients(connectionIds).Send(message); + } + public Task GroupAddMethod(string groupName) { return Groups.AddAsync(Context.ConnectionId, groupName); @@ -240,11 +250,18 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils { return Clients.Client(connectionId).Send(message); } + + public Task SendToMultipleClients(string message, IReadOnlyList connectionIds) + { + return Clients.MultipleClients(connectionIds).Send(message); + } + public async Task DelayedSend(string connectionId, string message) { await Task.Delay(100); await Clients.Client(connectionId).Send(message); } + public Task GroupAddMethod(string groupName) { return Groups.AddAsync(Context.ConnectionId, groupName); diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs index 43cdd16812..ed13d54bea 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs @@ -937,6 +937,53 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + [Theory] + [MemberData(nameof(HubTypes))] + public async Task SendToMultipleClients(Type hubType) + { + dynamic endPoint = HubEndPointTestUtils.GetHubEndpoint(hubType); + + using (var firstClient = new TestClient()) + using (var secondClient = new TestClient()) + using (var thirdClient = new TestClient()) + { + Task firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection); + Task secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection); + Task thirdEndPointTask = endPoint.OnConnectedAsync(thirdClient.Connection); + + await Task.WhenAll(firstClient.Connected, secondClient.Connected, thirdClient.Connected).OrTimeout(); + + var secondAndThirdClients = new HashSet {secondClient.Connection.ConnectionId, + thirdClient.Connection.ConnectionId }; + + secondAndThirdClients.Add(secondClient.Connection.ConnectionId); + + await firstClient.SendInvocationAsync("SendToMultipleClients", "Second and Third", secondAndThirdClients).OrTimeout(); + + var secondClientResult = await secondClient.ReadAsync().OrTimeout(); + var invocation = Assert.IsType(secondClientResult); + Assert.Equal("Send", invocation.Target); + Assert.Equal("Second and Third", invocation.Arguments[0]); + + var thirdClientResult = await thirdClient.ReadAsync().OrTimeout(); + invocation = Assert.IsType(thirdClientResult); + Assert.Equal("Send", invocation.Target); + Assert.Equal("Second and Third", invocation.Arguments[0]); + + // Check that first client only got the completion message + var hubMessage= await firstClient.ReadAsync().OrTimeout(); + Assert.IsType(hubMessage); + Assert.Null(firstClient.TryRead()); + + // kill the connections + firstClient.Dispose(); + secondClient.Dispose(); + thirdClient.Dispose(); + + await Task.WhenAll(firstEndPointTask, secondEndPointTask, thirdEndPointTask).OrTimeout(); + } + } + [Theory] [MemberData(nameof(HubTypes))] public async Task HubsCanAddAndSendToGroup(Type hubType)