From 06e3a08ac06b31fff9fdc68b7390b57966114b13 Mon Sep 17 00:00:00 2001 From: Pawel Kadluczka Date: Wed, 3 May 2017 17:04:26 -0700 Subject: [PATCH] Multi-server --- samples/ChatSample/HubWithPresence.cs | 4 +- samples/ChatSample/Hubs/Chat.cs | 15 +- samples/ChatSample/IUserTracker.cs | 4 +- samples/ChatSample/InMemoryUserTracker.cs | 8 +- .../ChatSample/PresenceHubLifetimeManager.cs | 32 +- samples/ChatSample/RedisUserTracker.cs | 287 +++++++++++++++--- samples/ChatSample/Views/Home/Index.cshtml | 25 +- 7 files changed, 302 insertions(+), 73 deletions(-) diff --git a/samples/ChatSample/HubWithPresence.cs b/samples/ChatSample/HubWithPresence.cs index 713c2c671b..ca8f0641a5 100644 --- a/samples/ChatSample/HubWithPresence.cs +++ b/samples/ChatSample/HubWithPresence.cs @@ -28,12 +28,12 @@ namespace ChatSample return _userTracker.UsersOnline(); } - public virtual Task OnUserJoined(UserDetails user) + public virtual Task OnUsersJoined(UserDetails[] user) { return Task.CompletedTask; } - public virtual Task OnUserLeft(UserDetails user) + public virtual Task OnUsersLeft(UserDetails[] user) { return Task.CompletedTask; } diff --git a/samples/ChatSample/Hubs/Chat.cs b/samples/ChatSample/Hubs/Chat.cs index 39ab6ba40d..2b9919ac70 100644 --- a/samples/ChatSample/Hubs/Chat.cs +++ b/samples/ChatSample/Hubs/Chat.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Authorization; @@ -24,22 +25,18 @@ namespace ChatSample.Hubs } await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await GetUsersOnline()); + await base.OnConnectedAsync(); } - public override Task OnUserJoined(UserDetails user) + public override Task OnUsersJoined(UserDetails[] users) { - if (user.ConnectionId != Context.ConnectionId) - { - return Clients.Client(Context.ConnectionId).InvokeAsync("UserJoined", user); - } - - return Task.CompletedTask; + return Clients.Client(Context.ConnectionId).InvokeAsync("UsersJoined", new[] { users }); } - public override Task OnUserLeft(UserDetails user) + public override Task OnUsersLeft(UserDetails[] users) { - return Clients.Client(Context.ConnectionId).InvokeAsync("UserLeft", user); + return Clients.Client(Context.ConnectionId).InvokeAsync("UsersLeft", new[] { users }); } public async Task Send(string message) diff --git a/samples/ChatSample/IUserTracker.cs b/samples/ChatSample/IUserTracker.cs index ced798de50..c55e22f8c5 100644 --- a/samples/ChatSample/IUserTracker.cs +++ b/samples/ChatSample/IUserTracker.cs @@ -14,7 +14,7 @@ namespace ChatSample Task AddUser(Connection connection, UserDetails userDetails); Task RemoveUser(Connection connection); - event Action UserJoined; - event Action UserLeft; + event Action UsersJoined; + event Action UsersLeft; } } diff --git a/samples/ChatSample/InMemoryUserTracker.cs b/samples/ChatSample/InMemoryUserTracker.cs index e4911d44e3..ce7877618c 100644 --- a/samples/ChatSample/InMemoryUserTracker.cs +++ b/samples/ChatSample/InMemoryUserTracker.cs @@ -12,8 +12,8 @@ namespace ChatSample private readonly ConcurrentDictionary _usersOnline = new ConcurrentDictionary(); - public event Action UserJoined; - public event Action UserLeft; + public event Action UsersJoined; + public event Action UsersLeft; public Task> UsersOnline() => Task.FromResult(_usersOnline.Values.AsEnumerable()); @@ -21,7 +21,7 @@ namespace ChatSample public Task AddUser(Connection connection, UserDetails userDetails) { _usersOnline.TryAdd(connection, userDetails); - UserJoined(userDetails); + UsersJoined(new[] { userDetails }); return Task.CompletedTask; } @@ -30,7 +30,7 @@ namespace ChatSample { if (_usersOnline.TryRemove(connection, out var userDetails)) { - UserLeft(userDetails); + UsersLeft(new[] { userDetails }); } return Task.CompletedTask; diff --git a/samples/ChatSample/PresenceHubLifetimeManager.cs b/samples/ChatSample/PresenceHubLifetimeManager.cs index 801bf9a07f..c918b81e41 100644 --- a/samples/ChatSample/PresenceHubLifetimeManager.cs +++ b/samples/ChatSample/PresenceHubLifetimeManager.cs @@ -8,6 +8,7 @@ using Microsoft.AspNetCore.Sockets; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.AspNetCore.SignalR.Redis; +using System.Linq; namespace ChatSample { @@ -47,8 +48,8 @@ namespace ChatSample ILoggerFactory loggerFactory, IServiceProvider serviceProvider) { _userTracker = userTracker; - _userTracker.UserJoined += OnUserJoined; - _userTracker.UserLeft += OnUserLeft; + _userTracker.UsersJoined += OnUsersJoined; + _userTracker.UsersLeft += OnUsersLeft; _serviceScopeFactory = serviceScopeFactory; _serviceProvider = serviceProvider; @@ -70,14 +71,29 @@ namespace ChatSample await _userTracker.RemoveUser(connection); } - private async void OnUserJoined(UserDetails userDetails) + private async void OnUsersJoined(UserDetails[] users) { - await Notify(hub => hub.OnUserJoined(userDetails)); + await Notify(hub => + { + if (users.Length == 1) + { + if (users[0].ConnectionId != hub.Context.ConnectionId) + { + return hub.OnUsersJoined(users); + } + } + else + { + return hub.OnUsersJoined( + users.Where(u => u.ConnectionId != hub.Context.Connection.ConnectionId).ToArray()); + } + return Task.CompletedTask; + }); } - private async void OnUserLeft(UserDetails userDetails) + private async void OnUsersLeft(UserDetails[] users) { - await Notify(hub => hub.OnUserLeft(userDetails)); + await Notify(hub => hub.OnUsersLeft(users)); } private async Task Notify(Func invocation) @@ -117,8 +133,8 @@ namespace ChatSample public void Dispose() { - _userTracker.UserJoined -= OnUserJoined; - _userTracker.UserLeft -= OnUserLeft; + _userTracker.UsersJoined -= OnUsersJoined; + _userTracker.UsersLeft -= OnUsersLeft; } public override Task InvokeAllAsync(string methodName, object[] args) diff --git a/samples/ChatSample/RedisUserTracker.cs b/samples/ChatSample/RedisUserTracker.cs index fd42e28f55..9083cae2fa 100644 --- a/samples/ChatSample/RedisUserTracker.cs +++ b/samples/ChatSample/RedisUserTracker.cs @@ -7,6 +7,7 @@ using System.IO; using System.Linq; using System.Net; using System.Text; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Redis; using Microsoft.AspNetCore.Sockets; @@ -21,45 +22,83 @@ namespace ChatSample { private readonly string ServerId = $"server:{Guid.NewGuid().ToString("D")}"; private readonly RedisKey ServerIndexRedisKey = "ServerIndex"; + private readonly RedisKey LastSeenRedisKey; private readonly RedisKey UserIndexRedisKey; - private readonly int _redisDatabase; + + private const int ScanInterval = 5; //seconds + private const int ServerInactivityTimeout = 30; // seconds + private readonly ConnectionMultiplexer _redisConnection; + private readonly IDatabase _redisDatabase; private readonly ISubscriber _redisSubscriber; - private readonly ILogger _logger; private const string UserAddedChannelName = "UserAdded"; private const string UserRemovedChannelName = "UserRemoved"; private readonly RedisChannel _userAddedChannel; private readonly RedisChannel _userRemovedChannel; - public event Action UserJoined; - public event Action UserLeft; + private readonly ILogger _logger; + + private HashSet _serverIds = new HashSet(); + private readonly UserEqualityComparer _userEqualityComparer = new UserEqualityComparer(); + private HashSet _users; + private readonly object _lockObj = new object(); + private readonly SemaphoreSlim _userSyncSempaphore = new SemaphoreSlim(initialCount: 1); + + private readonly Timer _timer; + + public event Action UsersJoined; + public event Action UsersLeft; public RedisUserTracker(IOptions options, ILoggerFactory loggerFactory) { + LastSeenRedisKey = $"{ServerId}:last-seen"; UserIndexRedisKey = $"{ServerId}:users"; + _users = new HashSet(_userEqualityComparer); _logger = loggerFactory.CreateLogger>(); - _redisDatabase = options.Value.Options.DefaultDatabase.GetValueOrDefault(); - _redisConnection = StartRedisConnection(options.Value); + (_redisConnection, _redisDatabase) = StartRedisConnection(options.Value); + + _timer = new Timer(Scan, this, TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(ScanInterval)); + + _logger.LogInformation("Started RedisUserTracker with Id: {0}", ServerId); _redisSubscriber = _redisConnection.GetSubscriber(); _userAddedChannel = new RedisChannel(UserAddedChannelName, RedisChannel.PatternMode.Literal); _userRemovedChannel = new RedisChannel(UserRemovedChannelName, RedisChannel.PatternMode.Literal); - _redisSubscriber.Subscribe(_userAddedChannel, (channel, value) => UserJoined(DeserializerUser(value))); - _redisSubscriber.Subscribe(_userRemovedChannel, (channel, value) => UserLeft(DeserializerUser(value))); + _redisSubscriber.Subscribe(_userAddedChannel, (channel, value) => + { + var user = DeserializerUser(value); + lock (_lockObj) + { + _users.Add(user); + } + UsersJoined(new[] { user }); + }); + + _redisSubscriber.Subscribe(_userRemovedChannel, (channel, value) => + { + var user = DeserializerUser(value); + lock (_lockObj) + { + _users.Remove(user); + } + + UsersLeft(new[] { user }); + }); } - private ConnectionMultiplexer StartRedisConnection(RedisOptions options) + private (ConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options) { // TODO: handle connection failures var redisConnection = ConnectToRedis(options, _logger); + var redisDatabase = redisConnection.GetDatabase(options.Options.DefaultDatabase.GetValueOrDefault()); // Register connection - var database = redisConnection.GetDatabase(_redisDatabase); - database.SetAdd(ServerIndexRedisKey, ServerId); + redisDatabase.SetAdd(ServerIndexRedisKey, ServerId); + redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks); - return redisConnection; + return (redisConnection, redisDatabase); } private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger) @@ -82,49 +121,204 @@ namespace ChatSample return ConnectionMultiplexer.Connect(configurationOptions, loggerTextWriter); } + public Task> UsersOnline() + { + lock(_lockObj) + { + return Task.FromResult(_users.ToArray().AsEnumerable()); + } + } + public async Task AddUser(Connection connection, UserDetails userDetails) { - var database = _redisConnection.GetDatabase(_redisDatabase); var key = GetUserRedisKey(connection); var user = SerializeUser(connection); - // need to await to make sure user is added before we call into the Hub - await database.StringSetAsync(key, SerializeUser(connection)); - await database.SetAddAsync(UserIndexRedisKey, key); - _ = _redisSubscriber.PublishAsync(_userAddedChannel, user); + + await _userSyncSempaphore.WaitAsync(); + + try + { + await _redisDatabase.ScriptEvaluateAsync( + @"redis.call('set', KEYS[1], ARGV[1]) + redis.call('sadd', KEYS[2], KEYS[1])", + new RedisKey[] { key, UserIndexRedisKey }, + new RedisValue[] { SerializeUser(connection) }); + + lock (_lockObj) + { + _users.Add(userDetails); + } + _ = _redisSubscriber.PublishAsync(_userAddedChannel, user); + } + finally + { + _userSyncSempaphore.Release(); + } } public async Task RemoveUser(Connection connection) { - var database = _redisConnection.GetDatabase(_redisDatabase); - await database.SetRemoveAsync(UserIndexRedisKey, connection.ConnectionId); - if (await database.KeyDeleteAsync(GetUserRedisKey(connection))) + await _userSyncSempaphore.WaitAsync(); + try { - _ = _redisSubscriber.PublishAsync(_userRemovedChannel, SerializeUser(connection)); + var userKey = GetUserRedisKey(connection); + await _redisDatabase.SetRemoveAsync(UserIndexRedisKey, userKey); + if (await _redisDatabase.KeyDeleteAsync(userKey)) + { + lock (_lockObj) + { + // TODO: remove without creating the object + _users.Remove(new UserDetails(connection.ConnectionId, name: null)); + } + + _ = _redisSubscriber.PublishAsync(_userRemovedChannel, SerializeUser(connection)); + } } - } - - public async Task> UsersOnline() - { - var database = _redisConnection.GetDatabase(_redisDatabase); - - var userIds = await database.ScriptEvaluateAsync( - @"local keys = { } - for i, key in pairs(redis.call('smembers', KEYS[1])) do - table.insert(keys, key.. ':users') - end - return redis.call('sunion', unpack(keys))", new[] { ServerIndexRedisKey }); - - if (!userIds.IsNull) + finally { - var users = await database.StringGetAsync(((RedisValue[])userIds).Select(id => (RedisKey)(string)id).ToArray()); - return users.Where(user => !user.IsNull).Select(user => DeserializerUser(user)); + _userSyncSempaphore.Release(); } - - return Enumerable.Empty(); } private static string GetUserRedisKey(Connection connection) => $"user:{connection.ConnectionId}"; + private static void Scan(object state) + { + _ = ((RedisUserTracker)state).Scan(); + } + + private async Task Scan() + { + try + { + _logger.LogDebug("Scanning for presence changes"); + + _redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks); + await RemoveExpiredServers(); + await CheckForServerChanges(); + + _logger.LogDebug("Completed scanning for presence changes"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error while checking presence changes."); + } + } + + private async Task RemoveExpiredServers() + { + // remove expired servers from server index + var expiredServers = await _redisDatabase.ScriptEvaluateAsync( + @"local expired_servers = { } + local count = 0 + for _, server_key in pairs(redis.call('smembers', KEYS[1])) do + local last_seen = tonumber(redis.call('get', server_key..':last-seen')) + if last_seen ~= nil and tonumber(ARGV[1]) - last_seen > tonumber(ARGV[2]) then + table.insert(expired_servers, server_key) + count = count + 1 + end + end + + if count > 0 then + redis.call('srem', KEYS[1], unpack(expired_servers)) + end + return expired_servers", + new[] { ServerIndexRedisKey }, + new RedisValue[] { DateTimeOffset.UtcNow.Ticks, TimeSpan.FromSeconds(ServerInactivityTimeout).Ticks }); + + // remove users + // TODO: this will probably have to be atomic with the previous script in case a server rejoins and populates + // the list of users + foreach (string expiredServerKey in (RedisValue[])expiredServers) + { + await _redisDatabase.ScriptEvaluateAsync( + @"local key = KEYS[1] + if redis.call('exists', key) == 1 then + redis.call('del', unpack(redis.call('smembers', key))) + end + redis.call('del', key..':last-seen', key..':users')", + new RedisKey[] { expiredServerKey }); + } + + if (((RedisValue[])expiredServers).Any()) + { + _logger.LogInformation("Removed entries for expired servers. {0}", + string.Join(",", (RedisValue[])expiredServers)); + } + } + + private async Task CheckForServerChanges() + { + var activeServers = new HashSet((await _redisDatabase.SetMembersAsync(ServerIndexRedisKey)).Select(v=>(string)v)); + + var synchronizeUsers = false; + lock (_lockObj) + { + if (activeServers.Count != _serverIds.Count || activeServers.Any(i => !_serverIds.Contains(i))) + { + _serverIds = activeServers; + synchronizeUsers = true; + } + } + + if (synchronizeUsers) + { + await SynchronizeUsers(); + } + } + + private async Task SynchronizeUsers() + { + + await _userSyncSempaphore.WaitAsync(); + try + { + + + var remoteUsersJson = await _redisDatabase.ScriptEvaluateAsync( + @"local server_keys = { } + for _, key in pairs(redis.call('smembers', KEYS[1])) do + table.insert(server_keys, key.. ':users') + end + local user_keys = redis.call('sunion', unpack(server_keys)) + local users = { } + if next(user_keys) ~= nil then + users = redis.call('mget', unpack(user_keys)) + end + return users + ", new[] { ServerIndexRedisKey }); + + var remoteUsers = new HashSet( + ((RedisValue[])remoteUsersJson) + .Where(u => u.HasValue) + .Select(userJson => DeserializerUser(userJson)), _userEqualityComparer); + + UserDetails[] newUsers, zombieUsers; + lock (_lockObj) + { + newUsers = remoteUsers.Except(_users, _userEqualityComparer).ToArray(); + zombieUsers = _users.Except(remoteUsers, _userEqualityComparer).ToArray(); + _users = remoteUsers; + } + + if (zombieUsers.Any()) + { + _logger.LogDebug("Removing zombie users: {0}", string.Join(",", zombieUsers.Select(u => u.ConnectionId))); + UsersLeft(zombieUsers); + } + + if (newUsers.Any()) + { + _logger.LogDebug("Adding new users: {0}", string.Join(",", newUsers.Select(u => u.ConnectionId))); + UsersJoined(newUsers); + } + } + finally + { + _userSyncSempaphore.Release(); + } + } + private static string SerializeUser(Connection connection) => $"{{ \"ConnectionID\": \"{connection.ConnectionId}\", \"Name\": \"{connection.User.Identity.Name}\" }}"; @@ -133,7 +327,22 @@ namespace ChatSample public void Dispose() { + _timer.Dispose(); _redisSubscriber.UnsubscribeAll(); + _redisConnection.Dispose(); + } + + private class UserEqualityComparer : IEqualityComparer + { + public bool Equals(UserDetails u1, UserDetails u2) + { + return ReferenceEquals(u1, u2) || u1.ConnectionId == u2.ConnectionId; + } + + public int GetHashCode(UserDetails u) + { + return u.ConnectionId.GetHashCode(); + } } private class LoggerTextWriter : TextWriter diff --git a/samples/ChatSample/Views/Home/Index.cshtml b/samples/ChatSample/Views/Home/Index.cshtml index 89b01f6800..6311044cff 100644 --- a/samples/ChatSample/Views/Home/Index.cshtml +++ b/samples/ChatSample/Views/Home/Index.cshtml @@ -28,18 +28,22 @@ connection.onClosed = e => { } }; -connection.on('SetUsersOnline', function (usersOnline) { +connection.on('SetUsersOnline', usersOnline => { usersOnline.forEach(user => addUserOnline(user)); }); -connection.on('UserJoined', user => { - appendLine('User ' + user.Name + ' joined the chat'); - addUserOnline(user); +connection.on('UsersJoined', users => { + users.forEach(user => { + appendLine('User ' + user.Name + ' joined the chat'); + addUserOnline(user); + }); }); -connection.on('UserLeft', user => { - appendLine('User ' + user.Name + ' left the chat'); - document.getElementById(user.ConnectionId).outerHTML = ''; +connection.on('UsersLeft', users => { + users.forEach(user => { + appendLine('User ' + user.Name + ' left the chat'); + document.getElementById(user.ConnectionId).outerHTML = ''; + }); }); connection.on('Send', (userName, message) => { @@ -72,9 +76,12 @@ function appendLine(line, color) { document.getElementById('messages').appendChild(child); }; -function addUserOnline (user) { +function addUserOnline(user) { + if (document.getElementById(user.ConnectionId)) { + return; + } var userLi = document.createElement('li'); - userLi.innerText = user.Name; + userLi.innerText = `user.Name (${user.ConnectionId})`; userLi.id = user.ConnectionId; document.getElementById('users').appendChild(userLi); }