AllExcept for Groups - GroupExcept (#1204)
This commit is contained in:
parent
d2c1138429
commit
eb2668e74e
|
|
@ -171,5 +171,10 @@ namespace ChatSample
|
|||
{
|
||||
return _wrappedHubLifetimeManager.RemoveGroupAsync(connectionId, groupName);
|
||||
}
|
||||
|
||||
public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
return _wrappedHubLifetimeManager.InvokeGroupExceptAsync(groupName, methodName, args, excludedIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,6 +129,25 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
if (groupName == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(groupName));
|
||||
}
|
||||
|
||||
var group = _groups[groupName];
|
||||
if (group != null)
|
||||
{
|
||||
var message = CreateInvocationMessage(methodName, args);
|
||||
var tasks = group.Values.Where(connection => !excludedIds.Contains(connection.ConnectionId))
|
||||
.Select(c => c.WriteAsync(message));
|
||||
return Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private InvocationMessage CreateInvocationMessage(string methodName, object[] args)
|
||||
{
|
||||
return new InvocationMessage(GetInvocationId(), nonBlocking: true, target: methodName, argumentBindingException: null, arguments: args);
|
||||
|
|
|
|||
|
|
@ -18,7 +18,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
public dynamic AllExcept(IReadOnlyList<string> excludedIds) => new DynamicClientProxy(_clients.AllExcept(excludedIds));
|
||||
public dynamic Caller => new DynamicClientProxy(_clients.Caller);
|
||||
public dynamic Client(string connectionId) => new DynamicClientProxy(_clients.Client(connectionId));
|
||||
public dynamic Group(string group) => new DynamicClientProxy(_clients.Group(group));
|
||||
public dynamic Group(string groupName) => new DynamicClientProxy(_clients.Group(groupName));
|
||||
public dynamic GroupExcept(string groupName, IReadOnlyList<string> excludedIds) => new DynamicClientProxy(_clients.GroupExcept(groupName, excludedIds));
|
||||
public dynamic Others => new DynamicClientProxy(_clients.Others);
|
||||
public dynamic User(string userId) => new DynamicClientProxy(_clients.User(userId));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return _hubClients.Group(groupName);
|
||||
}
|
||||
|
||||
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
|
||||
{
|
||||
return _hubClients.GroupExcept(groupName, excludeIds);
|
||||
}
|
||||
|
||||
public IClientProxy User(string userId)
|
||||
{
|
||||
return _hubClients.User(userId);
|
||||
|
|
|
|||
|
|
@ -37,6 +37,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return new GroupProxy<THub>(_lifetimeManager, groupName);
|
||||
}
|
||||
|
||||
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
|
||||
{
|
||||
return new GroupExceptProxy<THub>(_lifetimeManager, groupName, excludeIds);
|
||||
}
|
||||
|
||||
public virtual IClientProxy User(string userId)
|
||||
{
|
||||
return new UserProxy<THub>(_lifetimeManager, userId);
|
||||
|
|
|
|||
|
|
@ -39,6 +39,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return TypedClientBuilder<T>.Build(new GroupProxy<THub>(_lifetimeManager, groupName));
|
||||
}
|
||||
|
||||
public T GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
|
||||
{
|
||||
return TypedClientBuilder<T>.Build(new GroupExceptProxy<THub>(_lifetimeManager, groupName, excludeIds));
|
||||
}
|
||||
|
||||
public virtual T User(string userId)
|
||||
{
|
||||
return TypedClientBuilder<T>.Build(new UserProxy<THub>(_lifetimeManager, userId));
|
||||
|
|
|
|||
|
|
@ -20,6 +20,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
|
||||
public abstract Task InvokeGroupAsync(string groupName, string methodName, object[] args);
|
||||
|
||||
public abstract Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds);
|
||||
|
||||
public abstract Task InvokeUserAsync(string userId, string methodName, object[] args);
|
||||
|
||||
public abstract Task AddGroupAsync(string connectionId, string groupName);
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
|
||||
T Group(string groupName);
|
||||
|
||||
T GroupExcept(string groupName, IReadOnlyList<string> excludeIds);
|
||||
|
||||
T User(string userId);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,25 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
}
|
||||
}
|
||||
|
||||
public class GroupExceptProxy<THub> : IClientProxy
|
||||
{
|
||||
private readonly string _groupName;
|
||||
private readonly HubLifetimeManager<THub> _lifetimeManager;
|
||||
private readonly IReadOnlyList<string> _excludedIds;
|
||||
|
||||
public GroupExceptProxy(HubLifetimeManager<THub> lifetimeManager, string groupName, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
_lifetimeManager = lifetimeManager;
|
||||
_groupName = groupName;
|
||||
_excludedIds = excludedIds;
|
||||
}
|
||||
|
||||
public Task InvokeAsync(string method, params object[] args)
|
||||
{
|
||||
return _lifetimeManager.InvokeGroupExceptAsync(_groupName, method, args, _excludedIds);
|
||||
}
|
||||
}
|
||||
|
||||
public class AllClientProxy<THub> : IClientProxy
|
||||
{
|
||||
private readonly HubLifetimeManager<THub> _lifetimeManager;
|
||||
|
|
|
|||
|
|
@ -32,6 +32,11 @@ namespace Microsoft.AspNetCore.SignalR
|
|||
return TypedClientBuilder<T>.Build(_hubClients.Group(groupName));
|
||||
}
|
||||
|
||||
public T GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
|
||||
{
|
||||
return TypedClientBuilder<T>.Build(_hubClients.GroupExcept(groupName, excludeIds));
|
||||
}
|
||||
|
||||
public T User(string userId)
|
||||
{
|
||||
return TypedClientBuilder<T>.Build(_hubClients.User(userId));
|
||||
|
|
|
|||
|
|
@ -190,7 +190,19 @@ namespace Microsoft.AspNetCore.SignalR.Redis
|
|||
throw new ArgumentNullException(nameof(groupName));
|
||||
}
|
||||
|
||||
var message = new InvocationMessage(GetInvocationId(), nonBlocking: true, target: methodName, argumentBindingException: null, arguments: args);
|
||||
var message = new RedisExcludeClientsMessage(GetInvocationId(), nonBlocking: true, target: methodName, excludedIds: null, arguments: args);
|
||||
|
||||
return PublishAsync(_channelNamePrefix + ".group." + groupName, message);
|
||||
}
|
||||
|
||||
public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
if (groupName == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(groupName));
|
||||
}
|
||||
|
||||
var message = new RedisExcludeClientsMessage(GetInvocationId(), nonBlocking: true, target: methodName, excludedIds: excludedIds, arguments: args);
|
||||
|
||||
return PublishAsync(_channelNamePrefix + ".group." + groupName, message);
|
||||
}
|
||||
|
|
@ -547,11 +559,16 @@ namespace Microsoft.AspNetCore.SignalR.Redis
|
|||
{
|
||||
try
|
||||
{
|
||||
var message = DeserializeMessage<HubInvocationMessage>(data);
|
||||
var message = DeserializeMessage<RedisExcludeClientsMessage>(data);
|
||||
|
||||
var tasks = new List<Task>(group.Connections.Count);
|
||||
var tasks = new List<Task>();
|
||||
foreach (var groupConnection in group.Connections)
|
||||
{
|
||||
if (message.ExcludedIds?.Contains(groupConnection.ConnectionId) == true)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
tasks.Add(groupConnection.WriteAsync(message));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
|
|
@ -94,6 +95,37 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
|
|||
|
||||
await manager.InvokeGroupAsync("gunit", "Hello", new object[] { "World" }).OrTimeout();
|
||||
|
||||
await AssertMessageAsync(client1);
|
||||
Assert.Null(client2.TryRead());
|
||||
|
||||
await connection1.DisposeAsync().OrTimeout();
|
||||
await connection2.DisposeAsync().OrTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InvokeGroupExceptAsyncWritesToAllValidConnectionsInGroupOutput()
|
||||
{
|
||||
using (var client1 = new TestClient())
|
||||
using (var client2 = new TestClient())
|
||||
{
|
||||
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
|
||||
Options.Create(new RedisOptions()
|
||||
{
|
||||
Factory = t => new TestConnectionMultiplexer()
|
||||
}));
|
||||
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
|
||||
var connection2 = HubConnectionContextUtils.Create(client2.Connection);
|
||||
|
||||
await manager.OnConnectedAsync(connection1).OrTimeout();
|
||||
await manager.OnConnectedAsync(connection2).OrTimeout();
|
||||
|
||||
await manager.AddGroupAsync(connection1.ConnectionId, "gunit").OrTimeout();
|
||||
await manager.AddGroupAsync(connection2.ConnectionId, "gunit").OrTimeout();
|
||||
|
||||
var excludedIds = new List<string>{ client2.Connection.ConnectionId };
|
||||
await manager.InvokeGroupExceptAsync("gunit", "Hello", new object[] { "World" }, excludedIds).OrTimeout();
|
||||
|
||||
await AssertMessageAsync(client1);
|
||||
|
||||
await connection1.DisposeAsync().OrTimeout();
|
||||
|
|
|
|||
|
|
@ -992,6 +992,58 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
}
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(HubTypes))]
|
||||
public async Task SendToGroupExcept(Type hubType)
|
||||
{
|
||||
var serviceProvider = CreateServiceProvider();
|
||||
|
||||
dynamic endPoint = serviceProvider.GetService(GetEndPointType(hubType));
|
||||
|
||||
using (var firstClient = new TestClient())
|
||||
using (var secondClient = new TestClient())
|
||||
{
|
||||
Task firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
|
||||
Task secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);
|
||||
|
||||
await Task.WhenAll(firstClient.Connected, secondClient.Connected).OrTimeout();
|
||||
|
||||
var result = (await firstClient.InvokeAsync("GroupSendMethod", "testGroup", "test").OrTimeout()).Result;
|
||||
|
||||
// check that 'firstConnection' hasn't received the group send
|
||||
Assert.Null(firstClient.TryRead());
|
||||
|
||||
// check that 'secondConnection' hasn't received the group send
|
||||
Assert.Null(secondClient.TryRead());
|
||||
|
||||
await firstClient.InvokeAsync(nameof(MethodHub.GroupAddMethod), "testGroup").OrTimeout();
|
||||
await secondClient.InvokeAsync(nameof(MethodHub.GroupAddMethod), "testGroup").OrTimeout();
|
||||
|
||||
var excludedIds = new List<string> { firstClient.Connection.ConnectionId };
|
||||
|
||||
await firstClient.SendInvocationAsync("GroupExceptSendMethod", "testGroup", "test", excludedIds).OrTimeout();
|
||||
|
||||
// check that 'secondConnection' has received the group send
|
||||
var hubMessage = await secondClient.ReadAsync().OrTimeout();
|
||||
var invocation = Assert.IsType<InvocationMessage>(hubMessage);
|
||||
Assert.Equal("Send", invocation.Target);
|
||||
Assert.Single(invocation.Arguments);
|
||||
Assert.Equal("test", invocation.Arguments[0]);
|
||||
|
||||
// Check that first client only got the completion message
|
||||
hubMessage = await firstClient.ReadAsync().OrTimeout();
|
||||
Assert.IsType<CompletionMessage>(hubMessage);
|
||||
|
||||
Assert.Null(firstClient.TryRead());
|
||||
|
||||
// kill the connections
|
||||
firstClient.Dispose();
|
||||
secondClient.Dispose();
|
||||
|
||||
await Task.WhenAll(firstEndPointTask, secondEndPointTask).OrTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RemoveFromGroupWhenNotInGroupDoesNotFail()
|
||||
{
|
||||
|
|
@ -1626,6 +1678,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
return Clients.Group(groupName).Send(message);
|
||||
}
|
||||
|
||||
public Task GroupExceptSendMethod(string groupName, string message, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
return Clients.GroupExcept(groupName, excludedIds).Send(message);
|
||||
}
|
||||
|
||||
public Task BroadcastMethod(string message)
|
||||
{
|
||||
return Clients.All.Broadcast(message);
|
||||
|
|
@ -1814,6 +1871,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
return Clients.Group(groupName).Send(message);
|
||||
}
|
||||
|
||||
public Task GroupExceptSendMethod(string groupName, string message, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
return Clients.GroupExcept(groupName, excludedIds).Send(message);
|
||||
}
|
||||
|
||||
public Task BroadcastMethod(string message)
|
||||
{
|
||||
return Clients.All.Broadcast(message);
|
||||
|
|
@ -1945,6 +2007,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
|
|||
return Clients.Group(groupName).InvokeAsync("Send", message);
|
||||
}
|
||||
|
||||
public Task GroupExceptSendMethod(string groupName, string message, IReadOnlyList<string> excludedIds)
|
||||
{
|
||||
return Clients.GroupExcept(groupName, excludedIds).InvokeAsync("Send", message);
|
||||
}
|
||||
|
||||
public Task BroadcastMethod(string message)
|
||||
{
|
||||
return Clients.All.InvokeAsync("Broadcast", message);
|
||||
|
|
|
|||
Loading…
Reference in New Issue