From 23eb8aa7bd68d4481179bbff236598206747de05 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Thu, 3 Nov 2016 21:07:04 -0700 Subject: [PATCH] More redis tweaks - Implemented the task queue - Only subscribe to user channel if authenticated --- .../RedisHubLifetimeManager.cs | 87 ++++++++++++------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 78f244f83c..e15ff0082e 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -36,18 +36,20 @@ namespace Microsoft.AspNetCore.SignalR.Redis _redisServerConnection = _options.Connect(writer); _bus = _redisServerConnection.GetSubscriber(); - _bus.Subscribe(typeof(THub).FullName, (c, data) => + var previousBroadcastTask = Task.CompletedTask; + + _bus.Subscribe(typeof(THub).FullName, async (c, data) => { + await previousBroadcastTask; + var tasks = new List(_connections.Count); - // TODO: serialize once per format by providing a different stream? foreach (var connection in _connections) { tasks.Add(connection.Channel.Output.WriteAsync((byte[])data)); } - // TODO: Task Queue - Task.WhenAll(tasks).GetAwaiter().GetResult(); + previousBroadcastTask = Task.WhenAll(tasks); }); } @@ -111,43 +113,56 @@ namespace Microsoft.AspNetCore.SignalR.Redis public override Task OnConnectedAsync(Connection connection) { + var redisSubscriptions = connection.Metadata.GetOrAdd("redis_subscriptions", _ => new HashSet()); + var connectionTask = Task.CompletedTask; + var userTask = Task.CompletedTask; + _connections.Add(connection); var connectionChannel = typeof(THub).FullName + "." + connection.ConnectionId; - var userChannel = typeof(THub).FullName + "." + connection.User.Identity.Name; - - var task1 = _bus.SubscribeAsync(connectionChannel, (c, data) => - { - // TODO: serialize once per format by providing a different stream? - // TODO: Task Queue - connection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult(); - }); - - var task2 = _bus.SubscribeAsync(userChannel, (c, data) => - { - // TODO: serialize once per format by providing a different stream? - // TODO: Task Queue - // TODO: Look at optimizing (looping over connections checking for Name) - connection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult(); - }); - - var redisSubscriptions = connection.Metadata.GetOrAdd("redis_subscriptions", _ => new HashSet()); redisSubscriptions.Add(connectionChannel); - redisSubscriptions.Add(userChannel); - return Task.WhenAll(task1, task2); + var previousConnectionTask = Task.CompletedTask; + + connectionTask = _bus.SubscribeAsync(connectionChannel, async (c, data) => + { + await previousConnectionTask; + + previousConnectionTask = connection.Channel.Output.WriteAsync((byte[])data); + }); + + + if (connection.User.Identity.IsAuthenticated) + { + var userChannel = typeof(THub).FullName + "." + connection.User.Identity.Name; + redisSubscriptions.Add(userChannel); + + var previousUserTask = Task.CompletedTask; + + // TODO: Look at optimizing (looping over connections checking for Name) + userTask = _bus.SubscribeAsync(userChannel, async (c, data) => + { + await previousUserTask; + + previousUserTask = connection.Channel.Output.WriteAsync((byte[])data); + }); + } + + return Task.WhenAll(connectionTask, userTask); } - public override async Task OnDisconnectedAsync(Connection connection) + public override Task OnDisconnectedAsync(Connection connection) { _connections.Remove(connection); + var tasks = new List(); + var redisSubscriptions = connection.Metadata.Get>("redis_subscriptions"); if (redisSubscriptions != null) { foreach (var subscription in redisSubscriptions) { - await _bus.UnsubscribeAsync(subscription); + tasks.Add(_bus.UnsubscribeAsync(subscription)); } } @@ -157,9 +172,11 @@ namespace Microsoft.AspNetCore.SignalR.Redis { foreach (var group in groupNames) { - await RemoveGroupAsync(connection, group); + tasks.Add(RemoveGroupAsync(connection, group)); } } + + return Task.WhenAll(tasks); } public override async Task AddGroupAsync(Connection connection, string groupName) @@ -182,14 +199,22 @@ namespace Microsoft.AspNetCore.SignalR.Redis return; } - await _bus.SubscribeAsync(groupChannel, (c, data) => + var previousTask = Task.CompletedTask; + + await _bus.SubscribeAsync(groupChannel, async (c, data) => { + // Since this callback is async, we await the previous task then + // before sending the current message. This is because we don't + // want to do concurrent writes to the outgoing connections + await previousTask; + + var tasks = new List(group.Connections.Count); foreach (var groupConnection in group.Connections) { - // TODO: serialize once per format by providing a different stream? - // TODO: Task Queue - groupConnection.Channel.Output.WriteAsync((byte[])data).GetAwaiter().GetResult(); + tasks.Add(groupConnection.Channel.Output.WriteAsync((byte[])data)); } + + previousTask = Task.WhenAll(tasks); }); } finally