[Redis] Remove groups properly from connection diconnection (#1010)

And properly short circuit group actions for local connections
This commit is contained in:
BrennanConroy 2017-10-17 14:15:00 -07:00 committed by GitHub
parent af286c81bb
commit 878a70226c
1 changed files with 29 additions and 33 deletions

View File

@ -119,22 +119,21 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
var groupMessage = DeserializeMessage<GroupMessage>(data);
var connection = _connections[groupMessage.ConnectionId];
if (connection == null)
{
// user not on this server
return;
}
if (groupMessage.Action == GroupAction.Remove)
{
if (!await RemoveGroupAsyncCore(groupMessage.ConnectionId, groupMessage.Group))
{
// user not on this server
return;
}
await RemoveGroupAsyncCore(connection, groupMessage.Group);
}
if (groupMessage.Action == GroupAction.Add)
{
if (!await AddGroupAsyncCore(groupMessage.ConnectionId, groupMessage.Group))
{
// user not on this server
return;
}
await AddGroupAsyncCore(connection, groupMessage.Group);
}
// Sending ack to server that sent the original add/remove
@ -300,7 +299,9 @@ namespace Microsoft.AspNetCore.SignalR.Redis
// in RemoveGroupAsync
foreach (var group in groupNames.ToArray())
{
tasks.Add(RemoveGroupAsync(connection.ConnectionId, group));
// Use RemoveGroupAsyncCore because the connection is local and we don't want to
// accidentally go to other servers with our remove request.
tasks.Add(RemoveGroupAsyncCore(connection, group));
}
}
@ -319,23 +320,19 @@ namespace Microsoft.AspNetCore.SignalR.Redis
throw new ArgumentNullException(nameof(groupName));
}
if (await AddGroupAsyncCore(connectionId, groupName))
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
await AddGroupAsyncCore(connection, groupName);
return;
}
await SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add);
}
private async Task<bool> AddGroupAsyncCore(string connectionId, string groupName)
private async Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var connection = _connections[connectionId];
if (connection == null)
{
return false;
}
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
@ -355,7 +352,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
// Subscribe once
if (group.Connections.Count > 1)
{
return true;
return;
}
var previousTask = Task.CompletedTask;
@ -383,8 +380,6 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
group.Lock.Release();
}
return true;
}
public override async Task RemoveGroupAsync(string connectionId, string groupName)
@ -399,29 +394,30 @@ namespace Microsoft.AspNetCore.SignalR.Redis
throw new ArgumentNullException(nameof(groupName));
}
if (await RemoveGroupAsyncCore(connectionId, groupName))
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
await RemoveGroupAsyncCore(connection, groupName);
return;
}
await SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove);
}
private async Task<bool> RemoveGroupAsyncCore(string connectionId, string groupName)
/// <summary>
/// This takes <see cref="HubConnectionContext"/> because we want to remove the connection from the
/// _connections list in OnDisconnectedAsync and still be able to remove groups with this method.
/// </summary>
private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var groupChannel = _channelNamePrefix + ".group." + groupName;
GroupData group;
if (!_groups.TryGetValue(groupChannel, out group))
{
return false;
}
var connection = _connections[connectionId];
if (connection == null)
{
return false;
return;
}
var feature = connection.Features.Get<IRedisFeature>();
@ -453,7 +449,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
group.Lock.Release();
}
return true;
return;
}
private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action)
@ -501,7 +497,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
using (var reader = new JsonTextReader(new StreamReader(new MemoryStream(data))))
{
return (T)_serializer.Deserialize(reader);
return _serializer.Deserialize<T>(reader);
}
}