Splitting PresenceManager

This commit is contained in:
Pawel Kadluczka 2017-04-25 16:43:08 -07:00
parent 19b390c5ab
commit 7f3baf5ce6
8 changed files with 164 additions and 118 deletions

View File

@ -1,84 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace ChatSample
{
public class DefaultPresenceManager<THub> : IPresenceManager where THub : HubWithPresence
{
private IHubContext<THub> _hubContext;
private HubLifetimeManager<THub> _lifetimeManager;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger _logger;
public DefaultPresenceManager(IHubContext<THub> hubContext, HubLifetimeManager<THub> lifetimeManager,
IServiceScopeFactory serviceScopeFactory, ILoggerFactory loggerFactory)
{
_hubContext = hubContext;
_lifetimeManager = lifetimeManager;
_serviceScopeFactory = serviceScopeFactory;
_logger = loggerFactory.CreateLogger<DefaultPresenceManager<THub>>();
}
private readonly ConcurrentDictionary<Connection, UserDetails> _usersOnline
= new ConcurrentDictionary<Connection, UserDetails>();
public Task<IEnumerable<UserDetails>> UsersOnline()
=> Task.FromResult(_usersOnline.Values.AsEnumerable());
public async Task UserJoined(Connection connection)
{
var user = new UserDetails(connection.ConnectionId, connection.User.Identity.Name);
await Notify(hub => hub.OnUserJoined(user));
_usersOnline.TryAdd(connection, user);
}
public async Task UserLeft(Connection connection)
{
if (_usersOnline.TryRemove(connection, out var userDetails))
{
await Notify(hub => hub.OnUserLeft(userDetails));
}
}
private async Task Notify(Func<THub, Task> invocation)
{
foreach (var connection in _usersOnline.Keys)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub, IClientProxy>>();
var hub = hubActivator.Create();
hub.Clients = _hubContext.Clients;
hub.Context = new HubCallerContext(connection);
hub.Groups = new GroupManager<THub>(connection, _lifetimeManager);
try
{
await invocation(hub);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Presence notification failed.");
}
finally
{
hubActivator.Release(hub);
}
}
}
}
}
}

View File

@ -10,41 +10,28 @@ namespace ChatSample
{
public class HubWithPresence : HubWithPresence<IClientProxy>
{
public HubWithPresence(IPresenceManager presenceManager)
: base(presenceManager)
{
}
public HubWithPresence(IUserTracker<HubWithPresence> userTracker)
: base(userTracker)
{ }
}
public class HubWithPresence<TClient> : Hub<TClient>
{
private IPresenceManager _presenceManager;
private IUserTracker<HubWithPresence<TClient>> _userTracker;
public HubWithPresence(IPresenceManager presenceManager)
public HubWithPresence(IUserTracker<HubWithPresence<TClient>> userTracker)
{
_presenceManager = presenceManager;
_userTracker = userTracker;
}
public Task<IEnumerable<UserDetails>> UsersOnline
{
get
{
return _presenceManager.UsersOnline();
return _userTracker.UsersOnline();
}
}
public override Task OnConnectedAsync()
{
_presenceManager.UserJoined(Context.Connection);
return base.OnConnectedAsync();
}
public override Task OnDisconnectedAsync(Exception exception)
{
_presenceManager.UserLeft(Context.Connection);
return base.OnDisconnectedAsync(exception);
}
public virtual Task OnUserJoined(UserDetails user)
{
return Task.CompletedTask;

View File

@ -10,8 +10,8 @@ namespace ChatSample.Hubs
[Authorize]
public class Chat : HubWithPresence
{
public Chat(IPresenceManager presenceManager)
: base(presenceManager)
public Chat(IUserTracker<Chat> userTracker)
: base(userTracker)
{
}

View File

@ -1,16 +1,20 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
namespace ChatSample
{
public interface IPresenceManager
public interface IUserTracker<out THub>
{
Task<IEnumerable<UserDetails>> UsersOnline();
Task UserJoined(Connection connection);
Task UserLeft(Connection connection);
Task AddUser(Connection connection, UserDetails user);
Task<UserDetails> RemoveUser(Connection connection);
event Action<UserDetails> UserJoined;
event Action<UserDetails> UserLeft;
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
namespace ChatSample
{
public class InMemoryUserTracker<THub> : IUserTracker<THub>
{
private readonly ConcurrentDictionary<Connection, UserDetails> _usersOnline
= new ConcurrentDictionary<Connection, UserDetails>();
public event Action<UserDetails> UserJoined;
public event Action<UserDetails> UserLeft;
public Task<IEnumerable<UserDetails>> UsersOnline()
=> Task.FromResult(_usersOnline.Values.AsEnumerable());
public Task AddUser(Connection connection, UserDetails user)
{
_usersOnline.TryAdd(connection, user);
UserJoined(user);
return Task.CompletedTask;
}
public Task<UserDetails> RemoveUser(Connection connection)
{
if (_usersOnline.TryRemove(connection, out var userDetails))
{
UserLeft(userDetails);
}
return Task.FromResult(userDetails);
}
}
}

View File

@ -0,0 +1,99 @@

using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace ChatSample
{
public class PresenceHubLifetimeManager<THub> : DefaultHubLifetimeManager<THub>, IDisposable
where THub : HubWithPresence
{
private readonly ConnectionList _connections = new ConnectionList();
private readonly IUserTracker<THub> _userTracker;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private IHubContext<THub> _hubContext;
public PresenceHubLifetimeManager(InvocationAdapterRegistry registry, IUserTracker<THub> userTracker,
IServiceScopeFactory serviceScopeFactory, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
: base(registry)
{
_userTracker = userTracker;
_userTracker.UserJoined += OnUserJoined;
_userTracker.UserLeft += OnUserLeft;
_serviceScopeFactory = serviceScopeFactory;
_serviceProvider = serviceProvider;
_logger = loggerFactory.CreateLogger<PresenceHubLifetimeManager<THub>>();
}
public override async Task OnConnectedAsync(Connection connection)
{
await base.OnConnectedAsync(connection);
_connections.Add(connection);
await _userTracker.AddUser(connection, new UserDetails(connection.ConnectionId, connection.User.Identity.Name));
}
public override async Task OnDisconnectedAsync(Connection connection)
{
await base.OnDisconnectedAsync(connection);
_connections.Remove(connection);
await _userTracker.RemoveUser(connection);
}
private async void OnUserJoined(UserDetails userDetails)
{
await Notify(hub => hub.OnUserJoined(userDetails));
}
private async void OnUserLeft(UserDetails userDetails)
{
await Notify(hub => hub.OnUserLeft(userDetails));
}
private async Task Notify(Func<THub, Task> invocation)
{
foreach (var connection in _connections)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub, IClientProxy>>();
var hub = hubActivator.Create();
if (_hubContext == null)
{
// Cannot be injected due to circular dependency
_hubContext = _serviceProvider.GetRequiredService<IHubContext<THub>>();
}
hub.Clients = _hubContext.Clients;
hub.Context = new HubCallerContext(connection);
hub.Groups = new GroupManager<THub>(connection, this);
try
{
await invocation(hub);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Presence notification failed.");
}
finally
{
hubActivator.Release(hub);
}
}
}
}
public void Dispose()
{
_userTracker.UserJoined -= OnUserJoined;
_userTracker.UserLeft -= OnUserLeft;
}
}
}

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
/*
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@ -19,7 +20,7 @@ using StackExchange.Redis;
namespace ChatSample
{
public class RedisPresenceManager<THub> : IDisposable, IPresenceManager where THub : HubWithPresence
public class RedisPresenceManager<THub> : IDisposable, IUserTracker<THub> where THub : HubWithPresence
{
private readonly RedisKey UsersOnlineRedisKey = "UsersOnline";
private readonly RedisChannel _redisChannel;
@ -32,7 +33,7 @@ namespace ChatSample
private readonly ISubscriber _redisSubscriber;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<Connection, object> connections
private readonly ConcurrentDictionary<Connection, object> _connections
= new ConcurrentDictionary<Connection, object>();
// TODO: subscribe and handle lifecycle events/disconnects
@ -92,7 +93,7 @@ namespace ChatSample
public Task UserJoined(Connection connection)
{
connections.TryAdd(connection, null);
_connections.TryAdd(connection, null);
var database = _redisConnection.GetDatabase(_redisDatabase);
var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}";
@ -105,7 +106,7 @@ namespace ChatSample
public Task UserLeft(Connection connection)
{
connections.TryRemove(connection, out object _);
_connections.TryRemove(connection, out object _);
var database = _redisConnection.GetDatabase(_redisDatabase);
var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}";
@ -118,7 +119,7 @@ namespace ChatSample
private async Task Notify(Func<THub, Task> invocation)
{
foreach (var connection in connections.Keys)
foreach (var connection in _connections.Keys)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
@ -174,3 +175,4 @@ namespace ChatSample
}
}
}
*/

View File

@ -54,15 +54,14 @@ namespace ChatSample
services.AddTransient<IEmailSender, AuthMessageSender>();
services.AddTransient<ISmsSender, AuthMessageSender>();
// To use Redis scaleout uncomment .AddRedis and register RedisPresenceManager
// instead of DefaultPresenceManager
// To use Redis scaleout uncomment .AddRedis
services.AddSignalR()
// .AddRedis()
;
services.AddAuthentication();
services.AddSingleton<IPresenceManager, DefaultPresenceManager<Chat>>();
// services.AddSingleton<IPresenceManager, RedisPresenceManager<Chat>>();
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(PresenceHubLifetimeManager<>));
services.AddSingleton(typeof(IUserTracker<>), typeof(InMemoryUserTracker<>));
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.