More redis tweaks

- Implemented the task queue
- Only subscribe to user channel if authenticated
This commit is contained in:
David Fowler 2016-11-03 21:07:04 -07:00
parent ed41672381
commit 23eb8aa7bd
1 changed files with 56 additions and 31 deletions

View File

@ -36,18 +36,20 @@ namespace Microsoft.AspNetCore.SignalR.Redis
_redisServerConnection = _options.Connect(writer);
_bus = _redisServerConnection.GetSubscriber();
_bus.Subscribe(typeof(THub).FullName, (c, data) =>
var previousBroadcastTask = Task.CompletedTask;
_bus.Subscribe(typeof(THub).FullName, async (c, data) =>
{
await previousBroadcastTask;
var tasks = new List<Task>(_connections.Count);
// TODO: serialize once per format by providing a different stream?
foreach (var connection in _connections)
{
tasks.Add(connection.Channel.Output.WriteAsync((byte[])data));
}
// TODO: Task Queue
Task.WhenAll(tasks).GetAwaiter().GetResult();
previousBroadcastTask = Task.WhenAll(tasks);
});
}
@ -111,43 +113,56 @@ namespace Microsoft.AspNetCore.SignalR.Redis
public override Task OnConnectedAsync(Connection connection)
{
var redisSubscriptions = connection.Metadata.GetOrAdd("redis_subscriptions", _ => new HashSet<string>());
var connectionTask = Task.CompletedTask;
var userTask = Task.CompletedTask;
_connections.Add(connection);
var connectionChannel = typeof(THub).FullName + "." + connection.ConnectionId;
var userChannel = typeof(THub).FullName + "." + connection.User.Identity.Name;
var task1 = _bus.SubscribeAsync(connectionChannel, (c, data) =>
{
// TODO: serialize once per format by providing a different stream?
// TODO: Task Queue
connection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult();
});
var task2 = _bus.SubscribeAsync(userChannel, (c, data) =>
{
// TODO: serialize once per format by providing a different stream?
// TODO: Task Queue
// TODO: Look at optimizing (looping over connections checking for Name)
connection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult();
});
var redisSubscriptions = connection.Metadata.GetOrAdd("redis_subscriptions", _ => new HashSet<string>());
redisSubscriptions.Add(connectionChannel);
redisSubscriptions.Add(userChannel);
return Task.WhenAll(task1, task2);
var previousConnectionTask = Task.CompletedTask;
connectionTask = _bus.SubscribeAsync(connectionChannel, async (c, data) =>
{
await previousConnectionTask;
previousConnectionTask = connection.Channel.Output.WriteAsync((byte[])data);
});
if (connection.User.Identity.IsAuthenticated)
{
var userChannel = typeof(THub).FullName + "." + connection.User.Identity.Name;
redisSubscriptions.Add(userChannel);
var previousUserTask = Task.CompletedTask;
// TODO: Look at optimizing (looping over connections checking for Name)
userTask = _bus.SubscribeAsync(userChannel, async (c, data) =>
{
await previousUserTask;
previousUserTask = connection.Channel.Output.WriteAsync((byte[])data);
});
}
return Task.WhenAll(connectionTask, userTask);
}
public override async Task OnDisconnectedAsync(Connection connection)
public override Task OnDisconnectedAsync(Connection connection)
{
_connections.Remove(connection);
var tasks = new List<Task>();
var redisSubscriptions = connection.Metadata.Get<HashSet<string>>("redis_subscriptions");
if (redisSubscriptions != null)
{
foreach (var subscription in redisSubscriptions)
{
await _bus.UnsubscribeAsync(subscription);
tasks.Add(_bus.UnsubscribeAsync(subscription));
}
}
@ -157,9 +172,11 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
foreach (var group in groupNames)
{
await RemoveGroupAsync(connection, group);
tasks.Add(RemoveGroupAsync(connection, group));
}
}
return Task.WhenAll(tasks);
}
public override async Task AddGroupAsync(Connection connection, string groupName)
@ -182,14 +199,22 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return;
}
await _bus.SubscribeAsync(groupChannel, (c, data) =>
var previousTask = Task.CompletedTask;
await _bus.SubscribeAsync(groupChannel, async (c, data) =>
{
// Since this callback is async, we await the previous task then
// before sending the current message. This is because we don't
// want to do concurrent writes to the outgoing connections
await previousTask;
var tasks = new List<Task>(group.Connections.Count);
foreach (var groupConnection in group.Connections)
{
// TODO: serialize once per format by providing a different stream?
// TODO: Task Queue
groupConnection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult();
tasks.Add(groupConnection.Channel.Output.WriteAsync((byte[])data));
}
previousTask = Task.WhenAll(tasks);
});
}
finally