From 8b34b7f2ae80520a5db1a2f2ec69db33cc109100 Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Fri, 5 Jan 2018 13:57:19 -0800 Subject: [PATCH] Invoke Multiple Groups (#1254) --- .../ChatSample/PresenceHubLifetimeManager.cs | 5 +++ .../DefaultHubLifetimeManager.cs | 23 +++++++++++ .../DynamicHubClients.cs | 1 + .../HubCallerClients.cs | 5 +++ .../HubClients.cs | 5 +++ .../HubClients`T.cs | 5 +++ .../HubLifetimeManager.cs | 2 + .../IHubClients`T.cs | 2 + .../Proxies.cs | 17 ++++++++ .../TypedHubClients.cs | 5 +++ .../RedisHubLifetimeManager.cs | 20 ++++++++++ .../HubEndpointTestUtils/Hubs.cs | 15 +++++++ .../HubEndpointTests.cs | 40 +++++++++++++++++++ 13 files changed, 145 insertions(+) diff --git a/samples/ChatSample/PresenceHubLifetimeManager.cs b/samples/ChatSample/PresenceHubLifetimeManager.cs index e323e289a1..9c41231145 100644 --- a/samples/ChatSample/PresenceHubLifetimeManager.cs +++ b/samples/ChatSample/PresenceHubLifetimeManager.cs @@ -162,6 +162,11 @@ namespace ChatSample return _wrappedHubLifetimeManager.InvokeGroupAsync(groupName, methodName, args); } + public override Task InvokeGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args) + { + return _wrappedHubLifetimeManager.InvokeGroupsAsync(groupNames, methodName, args); + } + public override Task InvokeUserAsync(string userId, string methodName, object[] args) { return _wrappedHubLifetimeManager.InvokeUserAsync(userId, methodName, args); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs index ae264cd16d..f76003621d 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs @@ -127,6 +127,29 @@ namespace Microsoft.AspNetCore.SignalR return Task.CompletedTask; } + public override Task InvokeGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args) + { + // Each task represents the list of tasks for each of the writes within a group + var tasks = new List(); + var message = CreateInvocationMessage(methodName, args); + + foreach (var groupName in groupNames) + { + if (string.IsNullOrEmpty(groupName)) + { + throw new ArgumentException(nameof(groupName)); + } + + var group = _groups[groupName]; + if (group != null) + { + tasks.Add(Task.WhenAll(group.Values.Select(c => c.WriteAsync(message)))); + } + } + + return Task.WhenAll(tasks); + } + public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedIds) { if (groupName == null) diff --git a/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs index 81c2d0883e..8c4744340d 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs @@ -20,6 +20,7 @@ namespace Microsoft.AspNetCore.SignalR public dynamic Client(string connectionId) => new DynamicClientProxy(_clients.Client(connectionId)); public dynamic Clients(IReadOnlyList connectionIds) => new DynamicClientProxy(_clients.Clients(connectionIds)); public dynamic Group(string groupName) => new DynamicClientProxy(_clients.Group(groupName)); + public dynamic Groups(IReadOnlyList groupNames) => new DynamicClientProxy(_clients.Groups(groupNames)); public dynamic GroupExcept(string groupName, IReadOnlyList excludedIds) => new DynamicClientProxy(_clients.GroupExcept(groupName, excludedIds)); public dynamic OthersInGroup(string groupName) => new DynamicClientProxy(_clients.OthersInGroup(groupName)); public dynamic Others => new DynamicClientProxy(_clients.Others); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs index 893a542439..95044ae4c6 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs @@ -39,6 +39,11 @@ namespace Microsoft.AspNetCore.SignalR return _hubClients.Group(groupName); } + public IClientProxy Groups(IReadOnlyList groupNames) + { + return _hubClients.Groups(groupNames); + } + public IClientProxy OthersInGroup(string groupName) { return _hubClients.GroupExcept(groupName, _currentConnectionId); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubClients.cs index 6f69de32f3..afb1f06bf1 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubClients.cs @@ -42,6 +42,11 @@ namespace Microsoft.AspNetCore.SignalR return new MultipleClientProxy(_lifetimeManager, connectionIds); } + public IClientProxy Groups(IReadOnlyList groupNames) + { + return new MultipleGroupProxy(_lifetimeManager, groupNames); + } + public IClientProxy User(string userId) { return new UserProxy(_lifetimeManager, userId); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubClients`T.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubClients`T.cs index f1f1bf0b0c..b3337119e6 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubClients`T.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubClients`T.cs @@ -44,6 +44,11 @@ namespace Microsoft.AspNetCore.SignalR return TypedClientBuilder.Build(new GroupExceptProxy(_lifetimeManager, groupName, excludeIds)); } + public T Groups(IReadOnlyList groupNames) + { + return TypedClientBuilder.Build(new MultipleGroupProxy(_lifetimeManager, groupNames)); + } + public virtual T User(string userId) { return TypedClientBuilder.Build(new UserProxy(_lifetimeManager, userId)); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs index 8b620a1d6f..54693fbdc1 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs @@ -22,6 +22,8 @@ namespace Microsoft.AspNetCore.SignalR public abstract Task InvokeGroupAsync(string groupName, string methodName, object[] args); + public abstract Task InvokeGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args); + public abstract Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedIds); public abstract Task InvokeUserAsync(string userId, string methodName, object[] args); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs b/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs index 374d7f2b1d..8fd399ff6e 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs @@ -17,6 +17,8 @@ namespace Microsoft.AspNetCore.SignalR T Group(string groupName); + T Groups(IReadOnlyList groupNames); + T GroupExcept(string groupName, IReadOnlyList excludeIds); T User(string userId); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs b/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs index 533e40a91c..e5a9c71bea 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs @@ -40,6 +40,23 @@ namespace Microsoft.AspNetCore.SignalR } } + public class MultipleGroupProxy : IClientProxy + { + private readonly HubLifetimeManager _lifetimeManager; + private IReadOnlyList _groupNames; + + public MultipleGroupProxy(HubLifetimeManager lifetimeManager, IReadOnlyList groupNames) + { + _lifetimeManager = lifetimeManager; + _groupNames = groupNames; + } + + public Task InvokeAsync(string method, params object[] args) + { + return _lifetimeManager.InvokeGroupsAsync(_groupNames, method, args); + } + } + public class GroupExceptProxy : IClientProxy { private readonly string _groupName; diff --git a/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs b/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs index c303d0b265..76768bbdc6 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs @@ -42,6 +42,11 @@ namespace Microsoft.AspNetCore.SignalR return TypedClientBuilder.Build(_hubClients.Clients(connectionIds)); } + public T Groups(IReadOnlyList groupNames) + { + return TypedClientBuilder.Build(_hubClients.Groups(groupNames)); + } + public T OthersInGroup(string groupName) { return TypedClientBuilder.Build(_hubClients.OthersInGroup(groupName)); diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 0eb08287b6..8ac4d40fa1 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -601,6 +601,26 @@ namespace Microsoft.AspNetCore.SignalR.Redis return Task.WhenAll(publishTasks); } + public override Task InvokeGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args) + { + if (groupNames == null) + { + throw new ArgumentNullException(nameof(groupNames)); + } + var publishTasks = new List(groupNames.Count); + var message = new RedisInvocationMessage(target: methodName, arguments: args); + + foreach (var groupName in groupNames) + { + if (!string.IsNullOrEmpty(groupName)) + { + publishTasks.Add(PublishAsync(_channelNamePrefix + "." + groupName, message)); + } + } + + return Task.WhenAll(publishTasks); + } + private class LoggerTextWriter : TextWriter { private readonly ILogger _logger; diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs index 3c137b6b05..299dec8012 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTestUtils/Hubs.cs @@ -47,6 +47,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils return Clients.GroupExcept(groupName, excludedIds).InvokeAsync("Send", message); } + public Task SendToMultipleGroups(string message, IReadOnlyList groupNames) + { + return Clients.Groups(groupNames).InvokeAsync("Send", message); + } + public Task SendToOthersInGroup(string groupName, string message) { return Clients.OthersInGroup(groupName).InvokeAsync("Send", message); @@ -206,6 +211,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils return Clients.OthersInGroup(groupName).Send(message); } + public Task SendToMultipleGroups(string message, IReadOnlyList groupNames) + { + return Clients.Groups(groupNames).Send(message); + } + public Task BroadcastMethod(string message) { return Clients.All.Broadcast(message); @@ -277,6 +287,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests.HubEndpointTestUtils return Clients.GroupExcept(groupName, excludedIds).Send(message); } + public Task SendToMultipleGroups(string message, IReadOnlyList groupNames) + { + return Clients.Groups(groupNames).Send(message); + } + public Task SendToOthersInGroup(string groupName, string message) { return Clients.OthersInGroup(groupName).Send(message); diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs index ed13d54bea..ad73f122ac 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs @@ -1123,6 +1123,46 @@ namespace Microsoft.AspNetCore.SignalR.Tests } } + [Theory] + [MemberData(nameof(HubTypes))] + public async Task InvokeMultipleGroups(Type hubType) + { + dynamic endPoint = HubEndPointTestUtils.GetHubEndpoint(hubType); + + using (var firstClient = new TestClient()) + using (var secondClient = new TestClient()) + { + Task firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection); + Task secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection); + + await Task.WhenAll(firstClient.Connected, secondClient.Connected).OrTimeout(); + + await secondClient.InvokeAsync(nameof(MethodHub.GroupAddMethod), "GroupA").OrTimeout(); + await firstClient.InvokeAsync(nameof(MethodHub.GroupAddMethod), "GroupB").OrTimeout(); ; + + var groupNames = new List { "GroupA", "GroupB" }; + await firstClient.SendInvocationAsync(nameof(MethodHub.SendToMultipleGroups), "test", groupNames).OrTimeout(); + + var hubMessage = await secondClient.ReadAsync().OrTimeout(); + var invocation = Assert.IsType(hubMessage); + Assert.Equal("Send", invocation.Target); + Assert.Single(invocation.Arguments); + Assert.Equal("test", invocation.Arguments[0]); + + hubMessage = await firstClient.ReadAsync().OrTimeout(); + invocation = Assert.IsType(hubMessage); + Assert.Equal("Send", invocation.Target); + Assert.Single(invocation.Arguments); + Assert.Equal("test", invocation.Arguments[0]); + + // kill the connections + firstClient.Dispose(); + secondClient.Dispose(); + + await Task.WhenAll(firstEndPointTask, secondEndPointTask).OrTimeout(); + } + } + [Fact] public async Task RemoveFromGroupWhenNotInGroupDoesNotFail() {