Multi-server

This commit is contained in:
Pawel Kadluczka 2017-05-03 17:04:26 -07:00 committed by Pawel Kadluczka
parent 7300413d0c
commit 06e3a08ac0
7 changed files with 302 additions and 73 deletions

View File

@ -28,12 +28,12 @@ namespace ChatSample
return _userTracker.UsersOnline();
}
public virtual Task OnUserJoined(UserDetails user)
public virtual Task OnUsersJoined(UserDetails[] user)
{
return Task.CompletedTask;
}
public virtual Task OnUserLeft(UserDetails user)
public virtual Task OnUsersLeft(UserDetails[] user)
{
return Task.CompletedTask;
}

View File

@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
@ -24,22 +25,18 @@ namespace ChatSample.Hubs
}
await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await GetUsersOnline());
await base.OnConnectedAsync();
}
public override Task OnUserJoined(UserDetails user)
public override Task OnUsersJoined(UserDetails[] users)
{
if (user.ConnectionId != Context.ConnectionId)
{
return Clients.Client(Context.ConnectionId).InvokeAsync("UserJoined", user);
}
return Task.CompletedTask;
return Clients.Client(Context.ConnectionId).InvokeAsync("UsersJoined", new[] { users });
}
public override Task OnUserLeft(UserDetails user)
public override Task OnUsersLeft(UserDetails[] users)
{
return Clients.Client(Context.ConnectionId).InvokeAsync("UserLeft", user);
return Clients.Client(Context.ConnectionId).InvokeAsync("UsersLeft", new[] { users });
}
public async Task Send(string message)

View File

@ -14,7 +14,7 @@ namespace ChatSample
Task AddUser(Connection connection, UserDetails userDetails);
Task RemoveUser(Connection connection);
event Action<UserDetails> UserJoined;
event Action<UserDetails> UserLeft;
event Action<UserDetails[]> UsersJoined;
event Action<UserDetails[]> UsersLeft;
}
}

View File

@ -12,8 +12,8 @@ namespace ChatSample
private readonly ConcurrentDictionary<Connection, UserDetails> _usersOnline
= new ConcurrentDictionary<Connection, UserDetails>();
public event Action<UserDetails> UserJoined;
public event Action<UserDetails> UserLeft;
public event Action<UserDetails[]> UsersJoined;
public event Action<UserDetails[]> UsersLeft;
public Task<IEnumerable<UserDetails>> UsersOnline()
=> Task.FromResult(_usersOnline.Values.AsEnumerable());
@ -21,7 +21,7 @@ namespace ChatSample
public Task AddUser(Connection connection, UserDetails userDetails)
{
_usersOnline.TryAdd(connection, userDetails);
UserJoined(userDetails);
UsersJoined(new[] { userDetails });
return Task.CompletedTask;
}
@ -30,7 +30,7 @@ namespace ChatSample
{
if (_usersOnline.TryRemove(connection, out var userDetails))
{
UserLeft(userDetails);
UsersLeft(new[] { userDetails });
}
return Task.CompletedTask;

View File

@ -8,6 +8,7 @@ using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.SignalR.Redis;
using System.Linq;
namespace ChatSample
{
@ -47,8 +48,8 @@ namespace ChatSample
ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
_userTracker = userTracker;
_userTracker.UserJoined += OnUserJoined;
_userTracker.UserLeft += OnUserLeft;
_userTracker.UsersJoined += OnUsersJoined;
_userTracker.UsersLeft += OnUsersLeft;
_serviceScopeFactory = serviceScopeFactory;
_serviceProvider = serviceProvider;
@ -70,14 +71,29 @@ namespace ChatSample
await _userTracker.RemoveUser(connection);
}
private async void OnUserJoined(UserDetails userDetails)
private async void OnUsersJoined(UserDetails[] users)
{
await Notify(hub => hub.OnUserJoined(userDetails));
await Notify(hub =>
{
if (users.Length == 1)
{
if (users[0].ConnectionId != hub.Context.ConnectionId)
{
return hub.OnUsersJoined(users);
}
}
else
{
return hub.OnUsersJoined(
users.Where(u => u.ConnectionId != hub.Context.Connection.ConnectionId).ToArray());
}
return Task.CompletedTask;
});
}
private async void OnUserLeft(UserDetails userDetails)
private async void OnUsersLeft(UserDetails[] users)
{
await Notify(hub => hub.OnUserLeft(userDetails));
await Notify(hub => hub.OnUsersLeft(users));
}
private async Task Notify(Func<THub, Task> invocation)
@ -117,8 +133,8 @@ namespace ChatSample
public void Dispose()
{
_userTracker.UserJoined -= OnUserJoined;
_userTracker.UserLeft -= OnUserLeft;
_userTracker.UsersJoined -= OnUsersJoined;
_userTracker.UsersLeft -= OnUsersLeft;
}
public override Task InvokeAllAsync(string methodName, object[] args)

View File

@ -7,6 +7,7 @@ using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Redis;
using Microsoft.AspNetCore.Sockets;
@ -21,45 +22,83 @@ namespace ChatSample
{
private readonly string ServerId = $"server:{Guid.NewGuid().ToString("D")}";
private readonly RedisKey ServerIndexRedisKey = "ServerIndex";
private readonly RedisKey LastSeenRedisKey;
private readonly RedisKey UserIndexRedisKey;
private readonly int _redisDatabase;
private const int ScanInterval = 5; //seconds
private const int ServerInactivityTimeout = 30; // seconds
private readonly ConnectionMultiplexer _redisConnection;
private readonly IDatabase _redisDatabase;
private readonly ISubscriber _redisSubscriber;
private readonly ILogger _logger;
private const string UserAddedChannelName = "UserAdded";
private const string UserRemovedChannelName = "UserRemoved";
private readonly RedisChannel _userAddedChannel;
private readonly RedisChannel _userRemovedChannel;
public event Action<UserDetails> UserJoined;
public event Action<UserDetails> UserLeft;
private readonly ILogger _logger;
private HashSet<string> _serverIds = new HashSet<string>();
private readonly UserEqualityComparer _userEqualityComparer = new UserEqualityComparer();
private HashSet<UserDetails> _users;
private readonly object _lockObj = new object();
private readonly SemaphoreSlim _userSyncSempaphore = new SemaphoreSlim(initialCount: 1);
private readonly Timer _timer;
public event Action<UserDetails[]> UsersJoined;
public event Action<UserDetails[]> UsersLeft;
public RedisUserTracker(IOptions<RedisOptions> options, ILoggerFactory loggerFactory)
{
LastSeenRedisKey = $"{ServerId}:last-seen";
UserIndexRedisKey = $"{ServerId}:users";
_users = new HashSet<UserDetails>(_userEqualityComparer);
_logger = loggerFactory.CreateLogger<RedisUserTracker<THub>>();
_redisDatabase = options.Value.Options.DefaultDatabase.GetValueOrDefault();
_redisConnection = StartRedisConnection(options.Value);
(_redisConnection, _redisDatabase) = StartRedisConnection(options.Value);
_timer = new Timer(Scan, this, TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(ScanInterval));
_logger.LogInformation("Started RedisUserTracker with Id: {0}", ServerId);
_redisSubscriber = _redisConnection.GetSubscriber();
_userAddedChannel = new RedisChannel(UserAddedChannelName, RedisChannel.PatternMode.Literal);
_userRemovedChannel = new RedisChannel(UserRemovedChannelName, RedisChannel.PatternMode.Literal);
_redisSubscriber.Subscribe(_userAddedChannel, (channel, value) => UserJoined(DeserializerUser(value)));
_redisSubscriber.Subscribe(_userRemovedChannel, (channel, value) => UserLeft(DeserializerUser(value)));
_redisSubscriber.Subscribe(_userAddedChannel, (channel, value) =>
{
var user = DeserializerUser(value);
lock (_lockObj)
{
_users.Add(user);
}
UsersJoined(new[] { user });
});
_redisSubscriber.Subscribe(_userRemovedChannel, (channel, value) =>
{
var user = DeserializerUser(value);
lock (_lockObj)
{
_users.Remove(user);
}
UsersLeft(new[] { user });
});
}
private ConnectionMultiplexer StartRedisConnection(RedisOptions options)
private (ConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options)
{
// TODO: handle connection failures
var redisConnection = ConnectToRedis(options, _logger);
var redisDatabase = redisConnection.GetDatabase(options.Options.DefaultDatabase.GetValueOrDefault());
// Register connection
var database = redisConnection.GetDatabase(_redisDatabase);
database.SetAdd(ServerIndexRedisKey, ServerId);
redisDatabase.SetAdd(ServerIndexRedisKey, ServerId);
redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks);
return redisConnection;
return (redisConnection, redisDatabase);
}
private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger)
@ -82,49 +121,204 @@ namespace ChatSample
return ConnectionMultiplexer.Connect(configurationOptions, loggerTextWriter);
}
public Task<IEnumerable<UserDetails>> UsersOnline()
{
lock(_lockObj)
{
return Task.FromResult(_users.ToArray().AsEnumerable());
}
}
public async Task AddUser(Connection connection, UserDetails userDetails)
{
var database = _redisConnection.GetDatabase(_redisDatabase);
var key = GetUserRedisKey(connection);
var user = SerializeUser(connection);
// need to await to make sure user is added before we call into the Hub
await database.StringSetAsync(key, SerializeUser(connection));
await database.SetAddAsync(UserIndexRedisKey, key);
_ = _redisSubscriber.PublishAsync(_userAddedChannel, user);
await _userSyncSempaphore.WaitAsync();
try
{
await _redisDatabase.ScriptEvaluateAsync(
@"redis.call('set', KEYS[1], ARGV[1])
redis.call('sadd', KEYS[2], KEYS[1])",
new RedisKey[] { key, UserIndexRedisKey },
new RedisValue[] { SerializeUser(connection) });
lock (_lockObj)
{
_users.Add(userDetails);
}
_ = _redisSubscriber.PublishAsync(_userAddedChannel, user);
}
finally
{
_userSyncSempaphore.Release();
}
}
public async Task RemoveUser(Connection connection)
{
var database = _redisConnection.GetDatabase(_redisDatabase);
await database.SetRemoveAsync(UserIndexRedisKey, connection.ConnectionId);
if (await database.KeyDeleteAsync(GetUserRedisKey(connection)))
await _userSyncSempaphore.WaitAsync();
try
{
_ = _redisSubscriber.PublishAsync(_userRemovedChannel, SerializeUser(connection));
var userKey = GetUserRedisKey(connection);
await _redisDatabase.SetRemoveAsync(UserIndexRedisKey, userKey);
if (await _redisDatabase.KeyDeleteAsync(userKey))
{
lock (_lockObj)
{
// TODO: remove without creating the object
_users.Remove(new UserDetails(connection.ConnectionId, name: null));
}
_ = _redisSubscriber.PublishAsync(_userRemovedChannel, SerializeUser(connection));
}
}
}
public async Task<IEnumerable<UserDetails>> UsersOnline()
{
var database = _redisConnection.GetDatabase(_redisDatabase);
var userIds = await database.ScriptEvaluateAsync(
@"local keys = { }
for i, key in pairs(redis.call('smembers', KEYS[1])) do
table.insert(keys, key.. ':users')
end
return redis.call('sunion', unpack(keys))", new[] { ServerIndexRedisKey });
if (!userIds.IsNull)
finally
{
var users = await database.StringGetAsync(((RedisValue[])userIds).Select(id => (RedisKey)(string)id).ToArray());
return users.Where(user => !user.IsNull).Select(user => DeserializerUser(user));
_userSyncSempaphore.Release();
}
return Enumerable.Empty<UserDetails>();
}
private static string GetUserRedisKey(Connection connection) => $"user:{connection.ConnectionId}";
private static void Scan(object state)
{
_ = ((RedisUserTracker<THub>)state).Scan();
}
private async Task Scan()
{
try
{
_logger.LogDebug("Scanning for presence changes");
_redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks);
await RemoveExpiredServers();
await CheckForServerChanges();
_logger.LogDebug("Completed scanning for presence changes");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while checking presence changes.");
}
}
private async Task RemoveExpiredServers()
{
// remove expired servers from server index
var expiredServers = await _redisDatabase.ScriptEvaluateAsync(
@"local expired_servers = { }
local count = 0
for _, server_key in pairs(redis.call('smembers', KEYS[1])) do
local last_seen = tonumber(redis.call('get', server_key..':last-seen'))
if last_seen ~= nil and tonumber(ARGV[1]) - last_seen > tonumber(ARGV[2]) then
table.insert(expired_servers, server_key)
count = count + 1
end
end
if count > 0 then
redis.call('srem', KEYS[1], unpack(expired_servers))
end
return expired_servers",
new[] { ServerIndexRedisKey },
new RedisValue[] { DateTimeOffset.UtcNow.Ticks, TimeSpan.FromSeconds(ServerInactivityTimeout).Ticks });
// remove users
// TODO: this will probably have to be atomic with the previous script in case a server rejoins and populates
// the list of users
foreach (string expiredServerKey in (RedisValue[])expiredServers)
{
await _redisDatabase.ScriptEvaluateAsync(
@"local key = KEYS[1]
if redis.call('exists', key) == 1 then
redis.call('del', unpack(redis.call('smembers', key)))
end
redis.call('del', key..':last-seen', key..':users')",
new RedisKey[] { expiredServerKey });
}
if (((RedisValue[])expiredServers).Any())
{
_logger.LogInformation("Removed entries for expired servers. {0}",
string.Join(",", (RedisValue[])expiredServers));
}
}
private async Task CheckForServerChanges()
{
var activeServers = new HashSet<string>((await _redisDatabase.SetMembersAsync(ServerIndexRedisKey)).Select(v=>(string)v));
var synchronizeUsers = false;
lock (_lockObj)
{
if (activeServers.Count != _serverIds.Count || activeServers.Any(i => !_serverIds.Contains(i)))
{
_serverIds = activeServers;
synchronizeUsers = true;
}
}
if (synchronizeUsers)
{
await SynchronizeUsers();
}
}
private async Task SynchronizeUsers()
{
await _userSyncSempaphore.WaitAsync();
try
{
var remoteUsersJson = await _redisDatabase.ScriptEvaluateAsync(
@"local server_keys = { }
for _, key in pairs(redis.call('smembers', KEYS[1])) do
table.insert(server_keys, key.. ':users')
end
local user_keys = redis.call('sunion', unpack(server_keys))
local users = { }
if next(user_keys) ~= nil then
users = redis.call('mget', unpack(user_keys))
end
return users
", new[] { ServerIndexRedisKey });
var remoteUsers = new HashSet<UserDetails>(
((RedisValue[])remoteUsersJson)
.Where(u => u.HasValue)
.Select(userJson => DeserializerUser(userJson)), _userEqualityComparer);
UserDetails[] newUsers, zombieUsers;
lock (_lockObj)
{
newUsers = remoteUsers.Except(_users, _userEqualityComparer).ToArray();
zombieUsers = _users.Except(remoteUsers, _userEqualityComparer).ToArray();
_users = remoteUsers;
}
if (zombieUsers.Any())
{
_logger.LogDebug("Removing zombie users: {0}", string.Join(",", zombieUsers.Select(u => u.ConnectionId)));
UsersLeft(zombieUsers);
}
if (newUsers.Any())
{
_logger.LogDebug("Adding new users: {0}", string.Join(",", newUsers.Select(u => u.ConnectionId)));
UsersJoined(newUsers);
}
}
finally
{
_userSyncSempaphore.Release();
}
}
private static string SerializeUser(Connection connection) =>
$"{{ \"ConnectionID\": \"{connection.ConnectionId}\", \"Name\": \"{connection.User.Identity.Name}\" }}";
@ -133,7 +327,22 @@ namespace ChatSample
public void Dispose()
{
_timer.Dispose();
_redisSubscriber.UnsubscribeAll();
_redisConnection.Dispose();
}
private class UserEqualityComparer : IEqualityComparer<UserDetails>
{
public bool Equals(UserDetails u1, UserDetails u2)
{
return ReferenceEquals(u1, u2) || u1.ConnectionId == u2.ConnectionId;
}
public int GetHashCode(UserDetails u)
{
return u.ConnectionId.GetHashCode();
}
}
private class LoggerTextWriter : TextWriter

View File

@ -28,18 +28,22 @@ connection.onClosed = e => {
}
};
connection.on('SetUsersOnline', function (usersOnline) {
connection.on('SetUsersOnline', usersOnline => {
usersOnline.forEach(user => addUserOnline(user));
});
connection.on('UserJoined', user => {
appendLine('User ' + user.Name + ' joined the chat');
addUserOnline(user);
connection.on('UsersJoined', users => {
users.forEach(user => {
appendLine('User ' + user.Name + ' joined the chat');
addUserOnline(user);
});
});
connection.on('UserLeft', user => {
appendLine('User ' + user.Name + ' left the chat');
document.getElementById(user.ConnectionId).outerHTML = '';
connection.on('UsersLeft', users => {
users.forEach(user => {
appendLine('User ' + user.Name + ' left the chat');
document.getElementById(user.ConnectionId).outerHTML = '';
});
});
connection.on('Send', (userName, message) => {
@ -72,9 +76,12 @@ function appendLine(line, color) {
document.getElementById('messages').appendChild(child);
};
function addUserOnline (user) {
function addUserOnline(user) {
if (document.getElementById(user.ConnectionId)) {
return;
}
var userLi = document.createElement('li');
userLi.innerText = user.Name;
userLi.innerText = `user.Name (${user.ConnectionId})`;
userLi.id = user.ConnectionId;
document.getElementById('users').appendChild(userLi);
}