From e2169ceda636797ebe76ba1a26abd0ae59a0802e Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Tue, 10 Apr 2018 23:48:11 +0000 Subject: [PATCH] Connect to Redis asynchronously (#1922) --- .../RedisHubLifetimeManagerBenchmark.cs | 3 +- samples/ChatSample/RedisUserTracker.cs | 45 ++++--- .../HubConnectionContext.cs | 2 +- .../RedisHubLifetimeManager.cs | 120 ++++++++++-------- .../RedisOptions.cs | 14 +- .../RedisHubLifetimeManagerTests.cs | 2 +- 6 files changed, 104 insertions(+), 82 deletions(-) diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/RedisHubLifetimeManagerBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/RedisHubLifetimeManagerBenchmark.cs index 31a8f268a5..7835c9f39a 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/RedisHubLifetimeManagerBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/RedisHubLifetimeManagerBenchmark.cs @@ -13,6 +13,7 @@ using Microsoft.AspNetCore.SignalR.Redis; using Microsoft.AspNetCore.SignalR.Tests; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; +using StackExchange.Redis; namespace Microsoft.AspNetCore.SignalR.Microbenchmarks { @@ -40,7 +41,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks var protocols = GenerateProtocols(ProtocolCount).ToArray(); var options = Options.Create(new RedisOptions() { - Factory = t => new TestConnectionMultiplexer(server) + Factory = _ => Task.FromResult(new TestConnectionMultiplexer(server)) }); var resolver = new DefaultHubProtocolResolver(protocols, NullLogger.Instance); diff --git a/samples/ChatSample/RedisUserTracker.cs b/samples/ChatSample/RedisUserTracker.cs index 83113a1628..40c811d109 100644 --- a/samples/ChatSample/RedisUserTracker.cs +++ b/samples/ChatSample/RedisUserTracker.cs @@ -28,14 +28,14 @@ namespace ChatSample private const int ScanInterval = 5; //seconds private const int ServerInactivityTimeout = 30; // seconds - private readonly IConnectionMultiplexer _redisConnection; - private readonly IDatabase _redisDatabase; - private readonly ISubscriber _redisSubscriber; + private IConnectionMultiplexer _redisConnection; + private IDatabase _redisDatabase; + private ISubscriber _redisSubscriber; private const string UserAddedChannelName = "UserAdded"; private const string UserRemovedChannelName = "UserRemoved"; - private readonly RedisChannel _userAddedChannel; - private readonly RedisChannel _userRemovedChannel; + private RedisChannel _userAddedChannel; + private RedisChannel _userRemovedChannel; private readonly ILogger _logger; @@ -44,8 +44,9 @@ namespace ChatSample private HashSet _users; private readonly object _lockObj = new object(); private readonly SemaphoreSlim _userSyncSempaphore = new SemaphoreSlim(initialCount: 1); + private readonly RedisOptions _options; - private readonly Timer _timer; + private Timer _timer; public event Action UsersJoined; public event Action UsersLeft; @@ -57,7 +58,18 @@ namespace ChatSample _users = new HashSet(_userEqualityComparer); _logger = loggerFactory.CreateLogger>(); - (_redisConnection, _redisDatabase) = StartRedisConnection(options.Value); + _options = options.Value; + } + + private async Task EstablishRedisConnection() + { + // TODO: handle connection failures + _redisConnection = await ConnectToRedis(_options, _logger); + _redisDatabase = _redisConnection.GetDatabase(_options.Options.DefaultDatabase.GetValueOrDefault()); + + // Register connection + _redisDatabase.SetAdd(ServerIndexRedisKey, ServerId); + _redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks); _timer = new Timer(Scan, this, TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(ScanInterval)); @@ -88,30 +100,17 @@ namespace ChatSample }); } - private (IConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options) - { - // TODO: handle connection failures - var redisConnection = ConnectToRedis(options, _logger); - var redisDatabase = redisConnection.GetDatabase(options.Options.DefaultDatabase.GetValueOrDefault()); - - // Register connection - redisDatabase.SetAdd(ServerIndexRedisKey, ServerId); - redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks); - - return (redisConnection, redisDatabase); - } - - private static IConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger) + private static async Task ConnectToRedis(RedisOptions options, ILogger logger) { var loggerTextWriter = new LoggerTextWriter(logger); if (options.Factory != null) { - return options.Factory(loggerTextWriter); + return await options.Factory(loggerTextWriter); } if (options.Options.EndPoints.Any()) { - return ConnectionMultiplexer.Connect(options.Options, loggerTextWriter); + return await ConnectionMultiplexer.ConnectAsync(options.Options, loggerTextWriter); } var configurationOptions = new ConfigurationOptions(); diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs index 032e39c788..16b592f732 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs @@ -171,10 +171,10 @@ namespace Microsoft.AspNetCore.SignalR private async Task WriteSlowAsync(HubMessage message) { + await _writeLock.WaitAsync(); try { // Failed to get the lock immediately when entering WriteAsync so await until it is available - await _writeLock.WaitAsync(); await WriteCore(message); } diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 7623f11bf7..b10b2f4eac 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -24,13 +24,14 @@ namespace Microsoft.AspNetCore.SignalR.Redis private readonly HubConnectionStore _connections = new HubConnectionStore(); // TODO: Investigate "memory leak" entries never get removed private readonly ConcurrentDictionary _groups = new ConcurrentDictionary(StringComparer.Ordinal); - private readonly IConnectionMultiplexer _redisServerConnection; - private readonly ISubscriber _bus; + private IConnectionMultiplexer _redisServerConnection; + private ISubscriber _bus; private readonly ILogger _logger; private readonly RedisOptions _options; private readonly RedisChannels _channels; private readonly string _serverName = GenerateServerName(); private readonly RedisProtocol _protocol; + private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1); private readonly AckHandler _ackHandler; private int _internalId; @@ -45,51 +46,12 @@ namespace Microsoft.AspNetCore.SignalR.Redis _channels = new RedisChannels(typeof(THub).FullName); _protocol = new RedisProtocol(hubProtocolResolver.AllProtocols); - var writer = new LoggerTextWriter(logger); RedisLog.ConnectingToEndpoints(_logger, options.Value.Options.EndPoints, _serverName); - _redisServerConnection = _options.Connect(writer); - - _redisServerConnection.ConnectionRestored += (_, e) => - { - // We use the subscription connection type - // Ignore messages from the interactive connection (avoids duplicates) - if (e.ConnectionType == ConnectionType.Interactive) - { - return; - } - - RedisLog.ConnectionRestored(_logger); - }; - - _redisServerConnection.ConnectionFailed += (_, e) => - { - // We use the subscription connection type - // Ignore messages from the interactive connection (avoids duplicates) - if (e.ConnectionType == ConnectionType.Interactive) - { - return; - } - - RedisLog.ConnectionFailed(_logger, e.Exception); - }; - - if (_redisServerConnection.IsConnected) - { - RedisLog.Connected(_logger); - } - else - { - RedisLog.NotConnected(_logger); - } - _bus = _redisServerConnection.GetSubscriber(); - - SubscribeToAll(); - SubscribeToGroupManagementChannel(); - SubscribeToAckChannel(); } - public override Task OnConnectedAsync(HubConnectionContext connection) + public override async Task OnConnectedAsync(HubConnectionContext connection) { + await EnsureRedisServerConnection(); var feature = new RedisFeature(); connection.Features.Set(feature); @@ -106,7 +68,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis userTask = SubscribeToUser(connection, redisSubscriptions); } - return Task.WhenAll(connectionTask, userTask); + await Task.WhenAll(connectionTask, userTask); } public override Task OnDisconnectedAsync(HubConnectionContext connection) @@ -186,7 +148,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis return PublishAsync(_channels.Group(groupName), message); } - public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedIds) + public override async Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedIds) { if (groupName == null) { @@ -194,7 +156,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis } var message = _protocol.WriteInvocation(methodName, args, excludedIds); - return PublishAsync(_channels.Group(groupName), message); + await PublishAsync(_channels.Group(groupName), message); } public override Task SendUserAsync(string userId, string methodName, object[] args) @@ -307,10 +269,11 @@ namespace Microsoft.AspNetCore.SignalR.Redis return Task.CompletedTask; } - private Task PublishAsync(string channel, byte[] payload) + private async Task PublishAsync(string channel, byte[] payload) { + await EnsureRedisServerConnection(); RedisLog.PublishToChannel(_logger, channel); - return _bus.PublishAsync(channel, payload); + await _bus.PublishAsync(channel, payload); } private async Task AddGroupAsyncCore(HubConnectionContext connection, string groupName) @@ -405,8 +368,8 @@ namespace Microsoft.AspNetCore.SignalR.Redis public void Dispose() { - _bus.UnsubscribeAll(); - _redisServerConnection.Dispose(); + _bus?.UnsubscribeAll(); + _redisServerConnection?.Dispose(); _ackHandler.Dispose(); } @@ -541,6 +504,63 @@ namespace Microsoft.AspNetCore.SignalR.Redis }); } + private async Task EnsureRedisServerConnection() + { + if (_redisServerConnection == null) + { + await _connectionLock.WaitAsync(); + try + { + if (_redisServerConnection == null) + { + var writer = new LoggerTextWriter(_logger); + _redisServerConnection = await _options.ConnectAsync(writer); + _bus = _redisServerConnection.GetSubscriber(); + _redisServerConnection.ConnectionRestored += (_, e) => + { + // We use the subscription connection type + // Ignore messages from the interactive connection (avoids duplicates) + if (e.ConnectionType == ConnectionType.Interactive) + { + return; + } + + RedisLog.ConnectionRestored(_logger); + }; + + _redisServerConnection.ConnectionFailed += (_, e) => + { + // We use the subscription connection type + // Ignore messages from the interactive connection (avoids duplicates) + if (e.ConnectionType == ConnectionType.Interactive) + { + return; + } + + RedisLog.ConnectionFailed(_logger, e.Exception); + }; + + if (_redisServerConnection.IsConnected) + { + RedisLog.Connected(_logger); + } + else + { + RedisLog.NotConnected(_logger); + } + + SubscribeToAll(); + SubscribeToGroupManagementChannel(); + SubscribeToAckChannel(); + } + } + finally + { + _connectionLock.Release(); + } + } + } + private static string GenerateServerName() { // Use the machine name for convenient diagnostics, but add a guid to make it unique. diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs index 89565f40fd..330c1e5650 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs @@ -4,6 +4,7 @@ using System; using System.IO; using System.Net; +using System.Threading.Tasks; using StackExchange.Redis; namespace Microsoft.AspNetCore.SignalR.Redis @@ -16,12 +17,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis AbortOnConnectFail = false }; - public Func Factory { get; set; } + public Func> Factory { get; set; } - // TODO: Async - internal IConnectionMultiplexer Connect(TextWriter log) + internal async Task ConnectAsync(TextWriter log) { - if (Factory == null) + // Factory is publically settable. Assigning to a local variable before null check for thread safety. + var localFactory = Factory; + if (localFactory == null) { // REVIEW: Should we do this? if (Options.EndPoints.Count == 0) @@ -30,10 +32,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis Options.SetDefaultPorts(); } - return ConnectionMultiplexer.Connect(Options, log); + return await ConnectionMultiplexer.ConnectAsync(Options, log); } - return Factory(log); + return await localFactory(log); } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs index a7092cac8b..1964ffd6f7 100644 --- a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs @@ -550,7 +550,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests private RedisHubLifetimeManager CreateLifetimeManager(TestRedisServer server, MessagePackHubProtocolOptions messagePackOptions = null, JsonHubProtocolOptions jsonOptions = null) { - var options = new RedisOptions() { Factory = t => new TestConnectionMultiplexer(server) }; + var options = new RedisOptions() { Factory = async (t) => await Task.FromResult(new TestConnectionMultiplexer(server)) }; messagePackOptions = messagePackOptions ?? new MessagePackHubProtocolOptions(); jsonOptions = jsonOptions ?? new JsonHubProtocolOptions();