Redis presence manager

This commit is contained in:
Pawel Kadluczka 2017-04-27 10:01:12 -07:00
parent 7713124bd8
commit cf2d6ce697
7 changed files with 68 additions and 99 deletions

View File

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
@ -24,12 +23,9 @@ namespace ChatSample
_userTracker = userTracker; _userTracker = userTracker;
} }
public Task<IEnumerable<UserDetails>> UsersOnline public Task<IEnumerable<UserDetails>> GetUsersOnline()
{ {
get return _userTracker.UsersOnline();
{
return _userTracker.UsersOnline();
}
} }
public virtual Task OnUserJoined(UserDetails user) public virtual Task OnUserJoined(UserDetails user)

View File

@ -20,9 +20,10 @@ namespace ChatSample.Hubs
if (!Context.User.Identity.IsAuthenticated) if (!Context.User.Identity.IsAuthenticated)
{ {
Context.Connection.Dispose(); Context.Connection.Dispose();
return;
} }
await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await UsersOnline); await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await GetUsersOnline());
await base.OnConnectedAsync(); await base.OnConnectedAsync();
} }

View File

@ -11,8 +11,8 @@ namespace ChatSample
public interface IUserTracker<out THub> public interface IUserTracker<out THub>
{ {
Task<IEnumerable<UserDetails>> UsersOnline(); Task<IEnumerable<UserDetails>> UsersOnline();
Task AddUser(Connection connection, UserDetails user); Task AddUser(Connection connection, UserDetails userDetails);
Task<UserDetails> RemoveUser(Connection connection); Task RemoveUser(Connection connection);
event Action<UserDetails> UserJoined; event Action<UserDetails> UserJoined;
event Action<UserDetails> UserLeft; event Action<UserDetails> UserLeft;

View File

@ -18,22 +18,22 @@ namespace ChatSample
public Task<IEnumerable<UserDetails>> UsersOnline() public Task<IEnumerable<UserDetails>> UsersOnline()
=> Task.FromResult(_usersOnline.Values.AsEnumerable()); => Task.FromResult(_usersOnline.Values.AsEnumerable());
public Task AddUser(Connection connection, UserDetails user) public Task AddUser(Connection connection, UserDetails userDetails)
{ {
_usersOnline.TryAdd(connection, user); _usersOnline.TryAdd(connection, userDetails);
UserJoined(user); UserJoined(userDetails);
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task<UserDetails> RemoveUser(Connection connection) public Task RemoveUser(Connection connection)
{ {
if (_usersOnline.TryRemove(connection, out var userDetails)) if (_usersOnline.TryRemove(connection, out var userDetails))
{ {
UserLeft(userDetails); UserLeft(userDetails);
} }
return Task.FromResult(userDetails); return Task.CompletedTask;
} }
} }
} }

View File

@ -1,10 +1,13 @@
 // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.Sockets; using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.SignalR.Redis;
namespace ChatSample namespace ChatSample
{ {
@ -18,6 +21,16 @@ namespace ChatSample
} }
} }
public class RedisPresenceHublifetimeMenager<THub> : PresenceHubLifetimeManager<THub, RedisHubLifetimeManager<THub>>
where THub : HubWithPresence
{
public RedisPresenceHublifetimeMenager(IUserTracker<THub> userTracker, IServiceScopeFactory serviceScopeFactory,
ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
: base(userTracker, serviceScopeFactory, loggerFactory, serviceProvider)
{
}
}
public class PresenceHubLifetimeManager<THub, THubLifetimeManager> : HubLifetimeManager<THub>, IDisposable public class PresenceHubLifetimeManager<THub, THubLifetimeManager> : HubLifetimeManager<THub>, IDisposable
where THubLifetimeManager : HubLifetimeManager<THub> where THubLifetimeManager : HubLifetimeManager<THub>
where THub : HubWithPresence where THub : HubWithPresence

View File

@ -1,53 +1,37 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
/*
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Net;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Redis; using Microsoft.AspNetCore.SignalR.Redis;
using Microsoft.AspNetCore.Sockets; using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using StackExchange.Redis; using StackExchange.Redis;
namespace ChatSample namespace ChatSample
{ {
public class RedisPresenceManager<THub> : IDisposable, IUserTracker<THub> where THub : HubWithPresence public class RedisUserTracker<THub> : IUserTracker<THub>
{ {
private readonly RedisKey UsersOnlineRedisKey = "UsersOnline"; private readonly RedisKey UsersOnlineRedisKey = "UsersOnline";
private readonly RedisChannel _redisChannel;
private IHubContext<THub> _hubContext;
private HubLifetimeManager<THub> _lifetimeManager;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly int _redisDatabase; private readonly int _redisDatabase;
private readonly ConnectionMultiplexer _redisConnection; private readonly ConnectionMultiplexer _redisConnection;
private readonly ISubscriber _redisSubscriber; private readonly ISubscriber _redisSubscriber;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly RedisChannel _redisChannel;
private readonly ConcurrentDictionary<Connection, object> _connections public event Action<UserDetails> UserJoined;
= new ConcurrentDictionary<Connection, object>(); public event Action<UserDetails> UserLeft;
// TODO: subscribe and handle lifecycle events/disconnects public RedisUserTracker(IOptions<RedisOptions> options, ILoggerFactory loggerFactory)
// TODO: handle situations where Redis shuts down
// TODO: handle situations where a server goes down and the server has zombie connections
public RedisPresenceManager(IHubContext<THub> hubContext, HubLifetimeManager<THub> lifetimeManager,
IServiceScopeFactory serviceScopeFactory, IOptions<RedisOptions> options, ILoggerFactory loggerFactory)
{ {
_hubContext = hubContext; _logger = loggerFactory.CreateLogger<RedisUserTracker<THub>>();
_lifetimeManager = lifetimeManager;
_serviceScopeFactory = serviceScopeFactory;
_logger = loggerFactory.CreateLogger<RedisPresenceManager<THub>>();
_redisDatabase = options.Value.Options.DefaultDatabase.GetValueOrDefault(); _redisDatabase = options.Value.Options.DefaultDatabase.GetValueOrDefault();
_redisConnection = ConnectToRedis(options.Value, _logger); _redisConnection = ConnectToRedis(options.Value, _logger);
_redisSubscriber = _redisConnection.GetSubscriber(); _redisSubscriber = _redisConnection.GetSubscriber();
@ -59,11 +43,11 @@ namespace ChatSample
var user = ToUserDetails(stringValue.Substring(1)); var user = ToUserDetails(stringValue.Substring(1));
if (stringValue[0] == '-') if (stringValue[0] == '-')
{ {
_ = Notify(hub => hub.OnUserLeft(user)); UserLeft(user);
} }
else else
{ {
_ = Notify(hub => hub.OnUserJoined(user)); UserJoined(user);
} }
}); });
} }
@ -71,17 +55,20 @@ namespace ChatSample
private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger) private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger)
{ {
var loggerTextWriter = new LoggerTextWriter(logger); var loggerTextWriter = new LoggerTextWriter(logger);
return options.Factory == null if (options.Factory != null)
? ConnectionMultiplexer.Connect(options.Options, loggerTextWriter) {
: options.Factory(loggerTextWriter); return options.Factory(loggerTextWriter);
} }
public async Task<IEnumerable<UserDetails>> UsersOnline() if (options.Options.EndPoints.Any())
{ {
var database = _redisConnection.GetDatabase(_redisDatabase); return ConnectionMultiplexer.Connect(options.Options, loggerTextWriter);
var usersOnline = await database.SetMembersAsync(UsersOnlineRedisKey); }
return usersOnline.Select(u => ToUserDetails(u)); var configurationOptions = new ConfigurationOptions();
configurationOptions.EndPoints.Add(IPAddress.Loopback, 0);
configurationOptions.SetDefaultPorts();
return ConnectionMultiplexer.Connect(configurationOptions, loggerTextWriter);
} }
private static UserDetails ToUserDetails(string user) private static UserDetails ToUserDetails(string user)
@ -91,66 +78,30 @@ namespace ChatSample
return new UserDetails(user.Substring(0, pos), user.Substring(pos + 1)); return new UserDetails(user.Substring(0, pos), user.Substring(pos + 1));
} }
public Task UserJoined(Connection connection) public async Task AddUser(Connection connection, UserDetails userDetails)
{ {
_connections.TryAdd(connection, null);
var database = _redisConnection.GetDatabase(_redisDatabase); var database = _redisConnection.GetDatabase(_redisDatabase);
var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}"; var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}";
// Fire and forget
_ = database.SetAddAsync(UsersOnlineRedisKey, $"{connection.ConnectionId}|{connection.User.Identity.Name}"); // need to await to make sure user is added before we call into the Hub
await database.SetAddAsync(UsersOnlineRedisKey, $"{connection.ConnectionId}|{connection.User.Identity.Name}");
_ = _redisSubscriber.PublishAsync(_redisChannel, "+" + user); _ = _redisSubscriber.PublishAsync(_redisChannel, "+" + user);
return Task.CompletedTask;
} }
public Task UserLeft(Connection connection) public async Task RemoveUser(Connection connection)
{ {
_connections.TryRemove(connection, out object _);
var database = _redisConnection.GetDatabase(_redisDatabase); var database = _redisConnection.GetDatabase(_redisDatabase);
var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}"; var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}";
// Fire and forget
_ = database.SetRemoveAsync(UsersOnlineRedisKey, user); await database.SetRemoveAsync(UsersOnlineRedisKey, user);
_ = _redisSubscriber.PublishAsync(_redisChannel, "-" + user); _ = _redisSubscriber.PublishAsync(_redisChannel, "-" + user);
return Task.CompletedTask;
} }
private async Task Notify(Func<THub, Task> invocation) public async Task<IEnumerable<UserDetails>> UsersOnline()
{ {
foreach (var connection in _connections.Keys) var database = _redisConnection.GetDatabase(_redisDatabase);
{ var usersOnline = await database.SetMembersAsync(UsersOnlineRedisKey);
using (var scope = _serviceScopeFactory.CreateScope()) return usersOnline.Select(u => ToUserDetails(u));
{
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub, IClientProxy>>();
var hub = hubActivator.Create();
hub.Clients = _hubContext.Clients;
hub.Context = new HubCallerContext(connection);
hub.Groups = new GroupManager<THub>(connection, _lifetimeManager);
try
{
await invocation(hub);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Presence notification failed.");
}
finally
{
hubActivator.Release(hub);
}
}
}
}
public void Dispose()
{
_redisSubscriber.Unsubscribe(_redisChannel);
_redisConnection.Close();
_redisConnection.Dispose();
} }
private class LoggerTextWriter : TextWriter private class LoggerTextWriter : TextWriter
@ -171,8 +122,8 @@ namespace ChatSample
public override void WriteLine(string value) public override void WriteLine(string value)
{ {
_logger.LogDebug(value); _logger.LogDebug(value);
} }
} }
} }
} }
*/

View File

@ -9,6 +9,7 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Identity.EntityFrameworkCore; using Microsoft.AspNetCore.Identity.EntityFrameworkCore;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Redis;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -54,15 +55,22 @@ namespace ChatSample
services.AddTransient<IEmailSender, AuthMessageSender>(); services.AddTransient<IEmailSender, AuthMessageSender>();
services.AddTransient<ISmsSender, AuthMessageSender>(); services.AddTransient<ISmsSender, AuthMessageSender>();
// To use Redis scaleout uncomment .AddRedis // To use Redis scaleout uncomment .AddRedis and uncomment Redis related lines below for presence
services.AddSignalR() services.AddSignalR()
// .AddRedis() // .AddRedis()
; ;
services.AddAuthentication(); services.AddAuthentication();
services.AddSingleton(typeof(DefaultHubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>)); services.AddSingleton(typeof(DefaultHubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultPresenceHublifetimeMenager<>)); services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultPresenceHublifetimeMenager<>));
services.AddSingleton(typeof(IUserTracker<>), typeof(InMemoryUserTracker<>)); services.AddSingleton(typeof(IUserTracker<>), typeof(InMemoryUserTracker<>));
/*
services.AddSingleton(typeof(RedisHubLifetimeManager<>), typeof(RedisHubLifetimeManager<>));
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(RedisPresenceHublifetimeMenager<>));
services.AddSingleton(typeof(IUserTracker<>), typeof(RedisUserTracker<>));
*/
} }
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.