Connect to Redis asynchronously (#1922)

This commit is contained in:
Mikael Mengistu 2018-04-10 23:48:11 +00:00 committed by GitHub
parent 71be5bf637
commit e2169ceda6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 82 deletions

View File

@ -13,6 +13,7 @@ using Microsoft.AspNetCore.SignalR.Redis;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{
@ -40,7 +41,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
var protocols = GenerateProtocols(ProtocolCount).ToArray();
var options = Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer(server)
Factory = _ => Task.FromResult<IConnectionMultiplexer>(new TestConnectionMultiplexer(server))
});
var resolver = new DefaultHubProtocolResolver(protocols, NullLogger<DefaultHubProtocolResolver>.Instance);

View File

@ -28,14 +28,14 @@ namespace ChatSample
private const int ScanInterval = 5; //seconds
private const int ServerInactivityTimeout = 30; // seconds
private readonly IConnectionMultiplexer _redisConnection;
private readonly IDatabase _redisDatabase;
private readonly ISubscriber _redisSubscriber;
private IConnectionMultiplexer _redisConnection;
private IDatabase _redisDatabase;
private ISubscriber _redisSubscriber;
private const string UserAddedChannelName = "UserAdded";
private const string UserRemovedChannelName = "UserRemoved";
private readonly RedisChannel _userAddedChannel;
private readonly RedisChannel _userRemovedChannel;
private RedisChannel _userAddedChannel;
private RedisChannel _userRemovedChannel;
private readonly ILogger _logger;
@ -44,8 +44,9 @@ namespace ChatSample
private HashSet<UserDetails> _users;
private readonly object _lockObj = new object();
private readonly SemaphoreSlim _userSyncSempaphore = new SemaphoreSlim(initialCount: 1);
private readonly RedisOptions _options;
private readonly Timer _timer;
private Timer _timer;
public event Action<UserDetails[]> UsersJoined;
public event Action<UserDetails[]> UsersLeft;
@ -57,7 +58,18 @@ namespace ChatSample
_users = new HashSet<UserDetails>(_userEqualityComparer);
_logger = loggerFactory.CreateLogger<RedisUserTracker<THub>>();
(_redisConnection, _redisDatabase) = StartRedisConnection(options.Value);
_options = options.Value;
}
private async Task EstablishRedisConnection()
{
// TODO: handle connection failures
_redisConnection = await ConnectToRedis(_options, _logger);
_redisDatabase = _redisConnection.GetDatabase(_options.Options.DefaultDatabase.GetValueOrDefault());
// Register connection
_redisDatabase.SetAdd(ServerIndexRedisKey, ServerId);
_redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks);
_timer = new Timer(Scan, this, TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(ScanInterval));
@ -88,30 +100,17 @@ namespace ChatSample
});
}
private (IConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options)
{
// TODO: handle connection failures
var redisConnection = ConnectToRedis(options, _logger);
var redisDatabase = redisConnection.GetDatabase(options.Options.DefaultDatabase.GetValueOrDefault());
// Register connection
redisDatabase.SetAdd(ServerIndexRedisKey, ServerId);
redisDatabase.StringSet(LastSeenRedisKey, DateTimeOffset.UtcNow.Ticks);
return (redisConnection, redisDatabase);
}
private static IConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger)
private static async Task<IConnectionMultiplexer> ConnectToRedis(RedisOptions options, ILogger logger)
{
var loggerTextWriter = new LoggerTextWriter(logger);
if (options.Factory != null)
{
return options.Factory(loggerTextWriter);
return await options.Factory(loggerTextWriter);
}
if (options.Options.EndPoints.Any())
{
return ConnectionMultiplexer.Connect(options.Options, loggerTextWriter);
return await ConnectionMultiplexer.ConnectAsync(options.Options, loggerTextWriter);
}
var configurationOptions = new ConfigurationOptions();

View File

@ -171,10 +171,10 @@ namespace Microsoft.AspNetCore.SignalR
private async Task WriteSlowAsync(HubMessage message)
{
await _writeLock.WaitAsync();
try
{
// Failed to get the lock immediately when entering WriteAsync so await until it is available
await _writeLock.WaitAsync();
await WriteCore(message);
}

View File

@ -24,13 +24,14 @@ namespace Microsoft.AspNetCore.SignalR.Redis
private readonly HubConnectionStore _connections = new HubConnectionStore();
// TODO: Investigate "memory leak" entries never get removed
private readonly ConcurrentDictionary<string, GroupData> _groups = new ConcurrentDictionary<string, GroupData>(StringComparer.Ordinal);
private readonly IConnectionMultiplexer _redisServerConnection;
private readonly ISubscriber _bus;
private IConnectionMultiplexer _redisServerConnection;
private ISubscriber _bus;
private readonly ILogger _logger;
private readonly RedisOptions _options;
private readonly RedisChannels _channels;
private readonly string _serverName = GenerateServerName();
private readonly RedisProtocol _protocol;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);
private readonly AckHandler _ackHandler;
private int _internalId;
@ -45,51 +46,12 @@ namespace Microsoft.AspNetCore.SignalR.Redis
_channels = new RedisChannels(typeof(THub).FullName);
_protocol = new RedisProtocol(hubProtocolResolver.AllProtocols);
var writer = new LoggerTextWriter(logger);
RedisLog.ConnectingToEndpoints(_logger, options.Value.Options.EndPoints, _serverName);
_redisServerConnection = _options.Connect(writer);
_redisServerConnection.ConnectionRestored += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionRestored(_logger);
};
_redisServerConnection.ConnectionFailed += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionFailed(_logger, e.Exception);
};
if (_redisServerConnection.IsConnected)
{
RedisLog.Connected(_logger);
}
else
{
RedisLog.NotConnected(_logger);
}
_bus = _redisServerConnection.GetSubscriber();
SubscribeToAll();
SubscribeToGroupManagementChannel();
SubscribeToAckChannel();
}
public override Task OnConnectedAsync(HubConnectionContext connection)
public override async Task OnConnectedAsync(HubConnectionContext connection)
{
await EnsureRedisServerConnection();
var feature = new RedisFeature();
connection.Features.Set<IRedisFeature>(feature);
@ -106,7 +68,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
userTask = SubscribeToUser(connection, redisSubscriptions);
}
return Task.WhenAll(connectionTask, userTask);
await Task.WhenAll(connectionTask, userTask);
}
public override Task OnDisconnectedAsync(HubConnectionContext connection)
@ -186,7 +148,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
public override async Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
{
if (groupName == null)
{
@ -194,7 +156,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
}
var message = _protocol.WriteInvocation(methodName, args, excludedIds);
return PublishAsync(_channels.Group(groupName), message);
await PublishAsync(_channels.Group(groupName), message);
}
public override Task SendUserAsync(string userId, string methodName, object[] args)
@ -307,10 +269,11 @@ namespace Microsoft.AspNetCore.SignalR.Redis
return Task.CompletedTask;
}
private Task PublishAsync(string channel, byte[] payload)
private async Task PublishAsync(string channel, byte[] payload)
{
await EnsureRedisServerConnection();
RedisLog.PublishToChannel(_logger, channel);
return _bus.PublishAsync(channel, payload);
await _bus.PublishAsync(channel, payload);
}
private async Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
@ -405,8 +368,8 @@ namespace Microsoft.AspNetCore.SignalR.Redis
public void Dispose()
{
_bus.UnsubscribeAll();
_redisServerConnection.Dispose();
_bus?.UnsubscribeAll();
_redisServerConnection?.Dispose();
_ackHandler.Dispose();
}
@ -541,6 +504,63 @@ namespace Microsoft.AspNetCore.SignalR.Redis
});
}
private async Task EnsureRedisServerConnection()
{
if (_redisServerConnection == null)
{
await _connectionLock.WaitAsync();
try
{
if (_redisServerConnection == null)
{
var writer = new LoggerTextWriter(_logger);
_redisServerConnection = await _options.ConnectAsync(writer);
_bus = _redisServerConnection.GetSubscriber();
_redisServerConnection.ConnectionRestored += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionRestored(_logger);
};
_redisServerConnection.ConnectionFailed += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionFailed(_logger, e.Exception);
};
if (_redisServerConnection.IsConnected)
{
RedisLog.Connected(_logger);
}
else
{
RedisLog.NotConnected(_logger);
}
SubscribeToAll();
SubscribeToGroupManagementChannel();
SubscribeToAckChannel();
}
}
finally
{
_connectionLock.Release();
}
}
}
private static string GenerateServerName()
{
// Use the machine name for convenient diagnostics, but add a guid to make it unique.

View File

@ -4,6 +4,7 @@
using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Redis
@ -16,12 +17,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis
AbortOnConnectFail = false
};
public Func<TextWriter, IConnectionMultiplexer> Factory { get; set; }
public Func<TextWriter, Task<IConnectionMultiplexer>> Factory { get; set; }
// TODO: Async
internal IConnectionMultiplexer Connect(TextWriter log)
internal async Task<IConnectionMultiplexer> ConnectAsync(TextWriter log)
{
if (Factory == null)
// Factory is publically settable. Assigning to a local variable before null check for thread safety.
var localFactory = Factory;
if (localFactory == null)
{
// REVIEW: Should we do this?
if (Options.EndPoints.Count == 0)
@ -30,10 +32,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis
Options.SetDefaultPorts();
}
return ConnectionMultiplexer.Connect(Options, log);
return await ConnectionMultiplexer.ConnectAsync(Options, log);
}
return Factory(log);
return await localFactory(log);
}
}
}

View File

@ -550,7 +550,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
private RedisHubLifetimeManager<MyHub> CreateLifetimeManager(TestRedisServer server, MessagePackHubProtocolOptions messagePackOptions = null, JsonHubProtocolOptions jsonOptions = null)
{
var options = new RedisOptions() { Factory = t => new TestConnectionMultiplexer(server) };
var options = new RedisOptions() { Factory = async (t) => await Task.FromResult(new TestConnectionMultiplexer(server)) };
messagePackOptions = messagePackOptions ?? new MessagePackHubProtocolOptions();
jsonOptions = jsonOptions ?? new JsonHubProtocolOptions();