Redis based scaleout presence

This commit is contained in:
moozzyk 2017-04-07 07:35:01 -07:00 committed by Pawel Kadluczka
parent 632c8abf77
commit 19b390c5ab
13 changed files with 252 additions and 84 deletions

View File

@ -11,6 +11,7 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\client-ts\Microsoft.AspNetCore.SignalR.Client.TS\Microsoft.AspNetCore.SignalR.Client.TS.csproj" />
</ItemGroup>
@ -26,14 +27,14 @@
<PackageReference Include="Microsoft.AspNetCore.Server.IISIntegration" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.AspNetCore.StaticFiles" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="$(AspNetCoreVersion)" >
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="$(AspNetCoreVersion)">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer.Design" Version="$(AspNetCoreVersion)" >
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer.Design" Version="$(AspNetCoreVersion)">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="$(AspNetCoreVersion)" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="$(AspNetCoreVersion)" >
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="$(AspNetCoreVersion)">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="$(AspNetCoreVersion)" />
@ -43,8 +44,7 @@
</ItemGroup>
<Target Name="CopyTSClient" BeforeTargets="AfterBuild">
<Copy SourceFiles="$(MSBuildThisFileDirectory)..\..\client-ts\dist\browser\signalr-client.js"
DestinationFolder="$(MSBuildThisFileDirectory)wwwroot\lib\signalr-client" />
<Copy SourceFiles="$(MSBuildThisFileDirectory)..\..\client-ts\dist\browser\signalr-client.js" DestinationFolder="$(MSBuildThisFileDirectory)wwwroot\lib\signalr-client" />
</Target>
<Target Name="BowerInstall" BeforeTargets="PrepareForPublish">

View File

@ -1,54 +1,60 @@

using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace ChatSample
{
// TODO: not possible to use TClient instead of (implicit) IClientProxy
// public class DefaultPresenceManager<THub> : IPresenceManager where THub : HubWithPresence<TClient>
public class DefaultPresenceManager<THub> : IPresenceManager where THub : HubWithPresence
{
private IHubContext<THub> _hubContext;
private HubLifetimeManager<THub> _lifetimeManager;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger _logger;
public DefaultPresenceManager(IHubContext<THub> hubContext, HubLifetimeManager<THub> lifetimeManager, IServiceScopeFactory serviceScopeFactory)
public DefaultPresenceManager(IHubContext<THub> hubContext, HubLifetimeManager<THub> lifetimeManager,
IServiceScopeFactory serviceScopeFactory, ILoggerFactory loggerFactory)
{
_hubContext = hubContext;
_lifetimeManager = lifetimeManager;
_serviceScopeFactory = serviceScopeFactory;
_logger = loggerFactory.CreateLogger<DefaultPresenceManager<THub>>();
}
private readonly ConcurrentDictionary<Connection, UserDetails> usersOnline
private readonly ConcurrentDictionary<Connection, UserDetails> _usersOnline
= new ConcurrentDictionary<Connection, UserDetails>();
public IEnumerable<UserDetails> UsersOnline => usersOnline.Values;
public Task<IEnumerable<UserDetails>> UsersOnline()
=> Task.FromResult(_usersOnline.Values.AsEnumerable());
public async Task UserJoined(Connection connection)
{
// `context.User?.Identity?.Name ?? string.Empty` ?
var user = new UserDetails(connection.ConnectionId, connection.User.Identity.Name);
await Notify(hub => hub.OnUserJoined(user));
usersOnline.TryAdd(connection, user);
_usersOnline.TryAdd(connection, user);
}
public async Task UserLeft(Connection connection)
{
usersOnline.TryRemove(connection, out UserDetails user);
await Notify(hub => hub.OnUserLeft(user));
if (_usersOnline.TryRemove(connection, out var userDetails))
{
await Notify(hub => hub.OnUserLeft(userDetails));
}
}
private async Task Notify(Func<THub, Task> invocation)
{
foreach (var connection in usersOnline.Keys)
foreach (var connection in _usersOnline.Keys)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
@ -63,9 +69,9 @@ namespace ChatSample
{
await invocation(hub);
}
catch
catch (Exception ex)
{
// TODO: log
_logger.LogWarning(ex, "Presence notification failed.");
}
finally
{

View File

@ -1,7 +1,10 @@
using Microsoft.AspNetCore.SignalR;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.SignalR;
namespace ChatSample
{
@ -22,11 +25,11 @@ namespace ChatSample
_presenceManager = presenceManager;
}
public IEnumerable<UserDetails> UsersOnline
public Task<IEnumerable<UserDetails>> UsersOnline
{
get
{
return _presenceManager.UsersOnline;
return _presenceManager.UsersOnline();
}
}

View File

@ -22,7 +22,7 @@ namespace ChatSample.Hubs
Context.Connection.Dispose();
}
await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", UsersOnline);
await Clients.Client(Context.ConnectionId).InvokeAsync("SetUsersOnline", await UsersOnline);
await base.OnConnectedAsync();
}

View File

@ -1,26 +1,15 @@
using System;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.Sockets;
namespace ChatSample
{
public class UserDetails
{
public UserDetails(string connectionId, string name)
{
ConnectionId = connectionId;
Name = name;
}
public string ConnectionId { get; }
public string Name { get; }
}
public interface IPresenceManager
{
IEnumerable<UserDetails> UsersOnline { get; }
Task<IEnumerable<UserDetails>> UsersOnline();
Task UserJoined(Connection connection);
Task UserLeft(Connection connection);
}

View File

@ -1,11 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
namespace ChatSample

View File

@ -0,0 +1,176 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Redis;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace ChatSample
{
public class RedisPresenceManager<THub> : IDisposable, IPresenceManager where THub : HubWithPresence
{
private readonly RedisKey UsersOnlineRedisKey = "UsersOnline";
private readonly RedisChannel _redisChannel;
private IHubContext<THub> _hubContext;
private HubLifetimeManager<THub> _lifetimeManager;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly int _redisDatabase;
private readonly ConnectionMultiplexer _redisConnection;
private readonly ISubscriber _redisSubscriber;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<Connection, object> connections
= new ConcurrentDictionary<Connection, object>();
// TODO: subscribe and handle lifecycle events/disconnects
// TODO: handle situations where Redis shuts down
// TODO: handle situations where a server goes down and the server has zombie connections
public RedisPresenceManager(IHubContext<THub> hubContext, HubLifetimeManager<THub> lifetimeManager,
IServiceScopeFactory serviceScopeFactory, IOptions<RedisOptions> options, ILoggerFactory loggerFactory)
{
_hubContext = hubContext;
_lifetimeManager = lifetimeManager;
_serviceScopeFactory = serviceScopeFactory;
_logger = loggerFactory.CreateLogger<RedisPresenceManager<THub>>();
_redisDatabase = options.Value.Options.DefaultDatabase.GetValueOrDefault();
_redisConnection = ConnectToRedis(options.Value, _logger);
_redisSubscriber = _redisConnection.GetSubscriber();
_redisChannel = new RedisChannel((string)UsersOnlineRedisKey, RedisChannel.PatternMode.Literal);
_redisSubscriber.Subscribe(_redisChannel, (channel, value) =>
{
var stringValue = (string)value;
var user = ToUserDetails(stringValue.Substring(1));
if (stringValue[0] == '-')
{
_ = Notify(hub => hub.OnUserLeft(user));
}
else
{
_ = Notify(hub => hub.OnUserJoined(user));
}
});
}
private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger)
{
var loggerTextWriter = new LoggerTextWriter(logger);
return options.Factory == null
? ConnectionMultiplexer.Connect(options.Options, loggerTextWriter)
: options.Factory(loggerTextWriter);
}
public async Task<IEnumerable<UserDetails>> UsersOnline()
{
var database = _redisConnection.GetDatabase(_redisDatabase);
var usersOnline = await database.SetMembersAsync(UsersOnlineRedisKey);
return usersOnline.Select(u => ToUserDetails(u));
}
private static UserDetails ToUserDetails(string user)
{
var pos = user.IndexOf("|");
Debug.Assert(pos >= 0, "Invalid user details format");
return new UserDetails(user.Substring(0, pos), user.Substring(pos + 1));
}
public Task UserJoined(Connection connection)
{
connections.TryAdd(connection, null);
var database = _redisConnection.GetDatabase(_redisDatabase);
var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}";
// Fire and forget
_ = database.SetAddAsync(UsersOnlineRedisKey, $"{connection.ConnectionId}|{connection.User.Identity.Name}");
_ = _redisSubscriber.PublishAsync(_redisChannel, "+" + user);
return Task.CompletedTask;
}
public Task UserLeft(Connection connection)
{
connections.TryRemove(connection, out object _);
var database = _redisConnection.GetDatabase(_redisDatabase);
var user = $"{connection.ConnectionId}|{connection.User.Identity.Name}";
// Fire and forget
_ = database.SetRemoveAsync(UsersOnlineRedisKey, user);
_ = _redisSubscriber.PublishAsync(_redisChannel, "-" + user);
return Task.CompletedTask;
}
private async Task Notify(Func<THub, Task> invocation)
{
foreach (var connection in connections.Keys)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub, IClientProxy>>();
var hub = hubActivator.Create();
hub.Clients = _hubContext.Clients;
hub.Context = new HubCallerContext(connection);
hub.Groups = new GroupManager<THub>(connection, _lifetimeManager);
try
{
await invocation(hub);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Presence notification failed.");
}
finally
{
hubActivator.Release(hub);
}
}
}
}
public void Dispose()
{
_redisSubscriber.Unsubscribe(_redisChannel);
_redisConnection.Close();
_redisConnection.Dispose();
}
private class LoggerTextWriter : TextWriter
{
private readonly ILogger _logger;
public LoggerTextWriter(ILogger logger)
{
_logger = logger;
}
public override Encoding Encoding => Encoding.UTF8;
public override void Write(char value)
{
}
public override void WriteLine(string value)
{
_logger.LogDebug(value);
}
}
}
}

View File

@ -54,11 +54,15 @@ namespace ChatSample
services.AddTransient<IEmailSender, AuthMessageSender>();
services.AddTransient<ISmsSender, AuthMessageSender>();
services.AddSignalR();
// To use Redis scaleout uncomment .AddRedis and register RedisPresenceManager
// instead of DefaultPresenceManager
services.AddSignalR()
// .AddRedis()
;
services.AddAuthentication();
services.AddSingleton<IPresenceManager, DefaultPresenceManager<Chat>>();
// services.AddSingleton<IPresenceManager, RedisPresenceManager<Chat>>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

View File

@ -0,0 +1,17 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace ChatSample
{
public class UserDetails
{
public UserDetails(string connectionId, string name)
{
ConnectionId = connectionId;
Name = name;
}
public string ConnectionId { get; }
public string Name { get; }
}
}

View File

@ -19,7 +19,7 @@
let transportType = signalR.TransportType[getParameterByName('transport')] || signalR.TransportType.WebSockets;
let connection = new signalR.HubConnection(`http://${document.location.host}/chat`, 'formatType=json&format=text');
connection.on('Send', message => appendLine(message));
// connection.on('Send', message => appendLine(message));
connection.onClosed = e => {
if (e) {
appendLine('Connection closed with error: ' + e, 'red');
@ -33,27 +33,17 @@ connection.on('SetUsersOnline', function (usersOnline) {
usersOnline.forEach(user => addUserOnline(user));
});
connection.on('OnConnected', function (id, userName) {
addUserOnline(id, userName);
appendLine('User ' + userName + ' joined the chat');
});
connection.on('OnDisconnected', function (id, userName) {
appendLine('User ' + userName + ' left the chat');
document.getElementById(id).outerHTML = '';
}
connection.on('UserJoined', function (user) {
appendLine('User ' + userName + ' joined the chat');
connection.on('UserJoined', user => {
appendLine('User ' + user.Name + ' joined the chat');
addUserOnline(user);
});
connection.on('UserLeft', function (user) {
appendLine('User ' + userName + ' left the chat');
connection.on('UserLeft', user => {
appendLine('User ' + user.Name + ' left the chat');
document.getElementById(user.ConnectionId).outerHTML = '';
});
connection.on('Send', function (userName, message) {
connection.on('Send', (userName, message) => {
var nameElement = document.createElement('b');
nameElement.innerText = userName + ':';
@ -81,13 +71,13 @@ function appendLine(line, color) {
}
child.innerText = line;
document.getElementById('messages').appendChild(child);
}
};
function addUserOnline(id, userName) {
var user = document.createElement('li');
user.innerText = userName;
user.id = id;
document.getElementById('users').appendChild(user);
function addUserOnline (user) {
var userLi = document.createElement('li');
userLi.innerText = user.Name;
userLi.id = user.ConnectionId;
document.getElementById('users').appendChild(userLi);
}
function getParameterByName(name, url) {
@ -100,6 +90,6 @@ function getParameterByName(name, url) {
if (!results) return null;
if (!results[2]) return '';
return decodeURIComponent(results[2].replace(/\+/g, " "));
});
};
</script>

View File

@ -2,9 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Redis;

View File

@ -2,9 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
namespace Microsoft.AspNetCore.Builder
@ -36,10 +33,5 @@ namespace Microsoft.AspNetCore.Builder
{
_routes.MapEndpoint<HubEndPoint<THub>>(path);
}
public void MapHub<THub, TClient>(string path) where THub : Hub<TClient>
{
_routes.MapEndpoint<HubEndPoint<THub, TClient>>(path);
}
}
}

View File

@ -14,8 +14,6 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSockets();
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
services.AddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
// TODO: this breaks because of hardcoded IClientProxy
// services.AddSingleton(typeof(IHubContext<,>), typeof(HubContext<,>));
services.AddSingleton(typeof(HubEndPoint<>), typeof(HubEndPoint<>));
services.AddSingleton<IConfigureOptions<SignalROptions>, SignalROptionsSetup>();
services.AddSingleton<JsonNetInvocationAdapter>();