From cf2d6ce697e1ab3f87e3a4fc32dfad672c612efd Mon Sep 17 00:00:00 2001 From: Pawel Kadluczka Date: Thu, 27 Apr 2017 10:01:12 -0700 Subject: [PATCH] Redis presence manager --- samples/ChatSample/HubWithPresence.cs | 8 +- samples/ChatSample/Hubs/Chat.cs | 3 +- samples/ChatSample/IUserTracker.cs | 4 +- samples/ChatSample/InMemoryUserTracker.cs | 10 +- .../ChatSample/PresenceHubLifetimeManager.cs | 15 ++- ...PresenceManager.cs => RedisUserTracker.cs} | 117 +++++------------- samples/ChatSample/Startup.cs | 10 +- 7 files changed, 68 insertions(+), 99 deletions(-) rename samples/ChatSample/{RedisPresenceManager.cs => RedisUserTracker.cs} (50%) diff --git a/samples/ChatSample/HubWithPresence.cs b/samples/ChatSample/HubWithPresence.cs index 283655ec27..713c2c671b 100644 --- a/samples/ChatSample/HubWithPresence.cs +++ b/samples/ChatSample/HubWithPresence.cs @@ -1,7 +1,6 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using System; using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; @@ -24,12 +23,9 @@ namespace ChatSample _userTracker = userTracker; } - public Task> UsersOnline + public Task> GetUsersOnline() { - get - { - return _userTracker.UsersOnline(); - } + return _userTracker.UsersOnline(); } public virtual Task OnUserJoined(UserDetails user) diff --git a/samples/ChatSample/Hubs/Chat.cs b/samples/ChatSample/Hubs/Chat.cs index 4525d92b62..39ab6ba40d 100644 --- a/samples/ChatSample/Hubs/Chat.cs +++ b/samples/ChatSample/Hubs/Chat.cs @@ -20,9 +20,10 @@ namespace ChatSample.Hubs if (!Context.User.Identity.IsAuthenticated) { Context.Connection.Dispose(); + return; } - await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await UsersOnline); + await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await GetUsersOnline()); await base.OnConnectedAsync(); } diff --git a/samples/ChatSample/IUserTracker.cs b/samples/ChatSample/IUserTracker.cs index efcebacfd4..ced798de50 100644 --- a/samples/ChatSample/IUserTracker.cs +++ b/samples/ChatSample/IUserTracker.cs @@ -11,8 +11,8 @@ namespace ChatSample public interface IUserTracker { Task> UsersOnline(); - Task AddUser(Connection connection, UserDetails user); - Task RemoveUser(Connection connection); + Task AddUser(Connection connection, UserDetails userDetails); + Task RemoveUser(Connection connection); event Action UserJoined; event Action UserLeft; diff --git a/samples/ChatSample/InMemoryUserTracker.cs b/samples/ChatSample/InMemoryUserTracker.cs index 67668502fa..e4911d44e3 100644 --- a/samples/ChatSample/InMemoryUserTracker.cs +++ b/samples/ChatSample/InMemoryUserTracker.cs @@ -18,22 +18,22 @@ namespace ChatSample public Task> UsersOnline() => Task.FromResult(_usersOnline.Values.AsEnumerable()); - public Task AddUser(Connection connection, UserDetails user) + public Task AddUser(Connection connection, UserDetails userDetails) { - _usersOnline.TryAdd(connection, user); - UserJoined(user); + _usersOnline.TryAdd(connection, userDetails); + UserJoined(userDetails); return Task.CompletedTask; } - public Task RemoveUser(Connection connection) + public Task RemoveUser(Connection connection) { if (_usersOnline.TryRemove(connection, out var userDetails)) { UserLeft(userDetails); } - return Task.FromResult(userDetails); + return Task.CompletedTask; } } } diff --git a/samples/ChatSample/PresenceHubLifetimeManager.cs b/samples/ChatSample/PresenceHubLifetimeManager.cs index ddb7bb210b..801bf9a07f 100644 --- a/samples/ChatSample/PresenceHubLifetimeManager.cs +++ b/samples/ChatSample/PresenceHubLifetimeManager.cs @@ -1,10 +1,13 @@ - +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + using System; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.Sockets; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore.SignalR.Redis; namespace ChatSample { @@ -18,6 +21,16 @@ namespace ChatSample } } + public class RedisPresenceHublifetimeMenager : PresenceHubLifetimeManager> + where THub : HubWithPresence + { + public RedisPresenceHublifetimeMenager(IUserTracker userTracker, IServiceScopeFactory serviceScopeFactory, + ILoggerFactory loggerFactory, IServiceProvider serviceProvider) + : base(userTracker, serviceScopeFactory, loggerFactory, serviceProvider) + { + } + } + public class PresenceHubLifetimeManager : HubLifetimeManager, IDisposable where THubLifetimeManager : HubLifetimeManager where THub : HubWithPresence diff --git a/samples/ChatSample/RedisPresenceManager.cs b/samples/ChatSample/RedisUserTracker.cs similarity index 50% rename from samples/ChatSample/RedisPresenceManager.cs rename to samples/ChatSample/RedisUserTracker.cs index 6a3aefdd66..e70de95161 100644 --- a/samples/ChatSample/RedisPresenceManager.cs +++ b/samples/ChatSample/RedisUserTracker.cs @@ -1,53 +1,37 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -/* using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; -using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Redis; using Microsoft.AspNetCore.Sockets; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; namespace ChatSample { - public class RedisPresenceManager : IDisposable, IUserTracker where THub : HubWithPresence + public class RedisUserTracker : IUserTracker { private readonly RedisKey UsersOnlineRedisKey = "UsersOnline"; - private readonly RedisChannel _redisChannel; - - private IHubContext _hubContext; - private HubLifetimeManager _lifetimeManager; - private readonly IServiceScopeFactory _serviceScopeFactory; private readonly int _redisDatabase; private readonly ConnectionMultiplexer _redisConnection; private readonly ISubscriber _redisSubscriber; private readonly ILogger _logger; + private readonly RedisChannel _redisChannel; - private readonly ConcurrentDictionary _connections - = new ConcurrentDictionary(); + public event Action UserJoined; + public event Action UserLeft; - // TODO: subscribe and handle lifecycle events/disconnects - // TODO: handle situations where Redis shuts down - // TODO: handle situations where a server goes down and the server has zombie connections - - public RedisPresenceManager(IHubContext hubContext, HubLifetimeManager lifetimeManager, - IServiceScopeFactory serviceScopeFactory, IOptions options, ILoggerFactory loggerFactory) + public RedisUserTracker(IOptions options, ILoggerFactory loggerFactory) { - _hubContext = hubContext; - _lifetimeManager = lifetimeManager; - _serviceScopeFactory = serviceScopeFactory; - - _logger = loggerFactory.CreateLogger>(); + _logger = loggerFactory.CreateLogger>(); _redisDatabase = options.Value.Options.DefaultDatabase.GetValueOrDefault(); _redisConnection = ConnectToRedis(options.Value, _logger); _redisSubscriber = _redisConnection.GetSubscriber(); @@ -59,11 +43,11 @@ namespace ChatSample var user = ToUserDetails(stringValue.Substring(1)); if (stringValue[0] == '-') { - _ = Notify(hub => hub.OnUserLeft(user)); + UserLeft(user); } else { - _ = Notify(hub => hub.OnUserJoined(user)); + UserJoined(user); } }); } @@ -71,17 +55,20 @@ namespace ChatSample private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger) { var loggerTextWriter = new LoggerTextWriter(logger); - return options.Factory == null - ? ConnectionMultiplexer.Connect(options.Options, loggerTextWriter) - : options.Factory(loggerTextWriter); - } + if (options.Factory != null) + { + return options.Factory(loggerTextWriter); + } - public async Task> UsersOnline() - { - var database = _redisConnection.GetDatabase(_redisDatabase); - var usersOnline = await database.SetMembersAsync(UsersOnlineRedisKey); + if (options.Options.EndPoints.Any()) + { + return ConnectionMultiplexer.Connect(options.Options, loggerTextWriter); + } - return usersOnline.Select(u => ToUserDetails(u)); + var configurationOptions = new ConfigurationOptions(); + configurationOptions.EndPoints.Add(IPAddress.Loopback, 0); + configurationOptions.SetDefaultPorts(); + return ConnectionMultiplexer.Connect(configurationOptions, loggerTextWriter); } private static UserDetails ToUserDetails(string user) @@ -91,66 +78,30 @@ namespace ChatSample return new UserDetails(user.Substring(0, pos), user.Substring(pos + 1)); } - public Task UserJoined(Connection connection) + public async Task AddUser(Connection connection, UserDetails userDetails) { - _connections.TryAdd(connection, null); - var database = _redisConnection.GetDatabase(_redisDatabase); var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}"; - // Fire and forget - _ = database.SetAddAsync(UsersOnlineRedisKey, $"{connection.ConnectionId}|{connection.User.Identity.Name}"); + + // need to await to make sure user is added before we call into the Hub + await database.SetAddAsync(UsersOnlineRedisKey, $"{connection.ConnectionId}|{connection.User.Identity.Name}"); _ = _redisSubscriber.PublishAsync(_redisChannel, "+" + user); - - return Task.CompletedTask; } - public Task UserLeft(Connection connection) + public async Task RemoveUser(Connection connection) { - _connections.TryRemove(connection, out object _); - var database = _redisConnection.GetDatabase(_redisDatabase); var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}"; - // Fire and forget - _ = database.SetRemoveAsync(UsersOnlineRedisKey, user); + + await database.SetRemoveAsync(UsersOnlineRedisKey, user); _ = _redisSubscriber.PublishAsync(_redisChannel, "-" + user); - - return Task.CompletedTask; } - private async Task Notify(Func invocation) + public async Task> UsersOnline() { - foreach (var connection in _connections.Keys) - { - using (var scope = _serviceScopeFactory.CreateScope()) - { - var hubActivator = scope.ServiceProvider.GetRequiredService>(); - var hub = hubActivator.Create(); - - hub.Clients = _hubContext.Clients; - hub.Context = new HubCallerContext(connection); - hub.Groups = new GroupManager(connection, _lifetimeManager); - - try - { - await invocation(hub); - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Presence notification failed."); - } - finally - { - hubActivator.Release(hub); - } - } - } - } - - public void Dispose() - { - _redisSubscriber.Unsubscribe(_redisChannel); - _redisConnection.Close(); - _redisConnection.Dispose(); + var database = _redisConnection.GetDatabase(_redisDatabase); + var usersOnline = await database.SetMembersAsync(UsersOnlineRedisKey); + return usersOnline.Select(u => ToUserDetails(u)); } private class LoggerTextWriter : TextWriter @@ -171,8 +122,8 @@ namespace ChatSample public override void WriteLine(string value) { _logger.LogDebug(value); + } } } -} -*/ \ No newline at end of file +} \ No newline at end of file diff --git a/samples/ChatSample/Startup.cs b/samples/ChatSample/Startup.cs index 50fd036c1e..39fdff4e2c 100644 --- a/samples/ChatSample/Startup.cs +++ b/samples/ChatSample/Startup.cs @@ -9,6 +9,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Identity.EntityFrameworkCore; using Microsoft.AspNetCore.SignalR; +using Microsoft.AspNetCore.SignalR.Redis; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -54,15 +55,22 @@ namespace ChatSample services.AddTransient(); services.AddTransient(); - // To use Redis scaleout uncomment .AddRedis + // To use Redis scaleout uncomment .AddRedis and uncomment Redis related lines below for presence services.AddSignalR() // .AddRedis() ; services.AddAuthentication(); + services.AddSingleton(typeof(DefaultHubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>)); services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultPresenceHublifetimeMenager<>)); services.AddSingleton(typeof(IUserTracker<>), typeof(InMemoryUserTracker<>)); + + /* + services.AddSingleton(typeof(RedisHubLifetimeManager<>), typeof(RedisHubLifetimeManager<>)); + services.AddSingleton(typeof(HubLifetimeManager<>), typeof(RedisPresenceHublifetimeMenager<>)); + services.AddSingleton(typeof(IUserTracker<>), typeof(RedisUserTracker<>)); + */ } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.