diff --git a/samples/SocketsSample/EndPoints/Hubs/PubSubHubLifetimeManager.cs b/samples/SocketsSample/EndPoints/Hubs/RedisHubLifetimeManager.cs similarity index 50% rename from samples/SocketsSample/EndPoints/Hubs/PubSubHubLifetimeManager.cs rename to samples/SocketsSample/EndPoints/Hubs/RedisHubLifetimeManager.cs index 1573104b02..e168275b53 100644 --- a/samples/SocketsSample/EndPoints/Hubs/PubSubHubLifetimeManager.cs +++ b/samples/SocketsSample/EndPoints/Hubs/RedisHubLifetimeManager.cs @@ -1,22 +1,30 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; +using System.IO; +using System.Text; +using System.Threading; using System.Threading.Tasks; using Channels; using Microsoft.AspNetCore.Sockets; -using SocketsSample.Hubs; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; namespace SocketsSample.EndPoints.Hubs { - public class PubSubHubLifetimeManager : HubLifetimeManager + public class RedisHubLifetimeManager : HubLifetimeManager, IDisposable { - private readonly IPubSub _bus; private readonly InvocationAdapterRegistry _registry; + private readonly ConnectionMultiplexer _redis; + private readonly ISubscriber _bus; + private readonly ILoggerFactory _loggerFactory; - public PubSubHubLifetimeManager(IPubSub bus, InvocationAdapterRegistry registry) + public RedisHubLifetimeManager(InvocationAdapterRegistry registry, ILoggerFactory loggerFactory) { - _bus = bus; + var writer = new LoggerTextWriter(loggerFactory.CreateLogger>()); + _loggerFactory = loggerFactory; + _redis = ConnectionMultiplexer.Connect("localhost", writer); + _bus = _redis.GetSubscriber(); _registry = registry; } @@ -28,7 +36,7 @@ namespace SocketsSample.EndPoints.Hubs Arguments = args }; - return _bus.Publish(typeof(THub).Name, message); + return PublishAsync(typeof(THub).Name, message); } public override Task InvokeConnection(string connectionId, string methodName, params object[] args) @@ -39,7 +47,7 @@ namespace SocketsSample.EndPoints.Hubs Arguments = args }; - return _bus.Publish(typeof(THub) + "." + connectionId, message); + return PublishAsync(typeof(THub) + "." + connectionId, message); } public override Task InvokeGroup(string groupName, string methodName, params object[] args) @@ -50,7 +58,7 @@ namespace SocketsSample.EndPoints.Hubs Arguments = args }; - return _bus.Publish(typeof(THub) + "." + groupName, message); + return PublishAsync(typeof(THub) + "." + groupName, message); } public override Task InvokeUser(string userId, string methodName, params object[] args) @@ -61,7 +69,21 @@ namespace SocketsSample.EndPoints.Hubs Arguments = args }; - return _bus.Publish(typeof(THub) + "." + userId, message); + return PublishAsync(typeof(THub) + "." + userId, message); + } + + private Task PublishAsync(string channel, InvocationDescriptor message) + { + // TODO: What format?? + var invocationAdapter = _registry.GetInvocationAdapter("json"); + + // BAD + using (var ms = new MemoryStream()) + { + invocationAdapter.WriteInvocationDescriptor(message, ms); + + return _bus.PublishAsync(channel, ms.ToArray()); + } } public override Task OnConnectedAsync(Connection connection) @@ -87,6 +109,8 @@ namespace SocketsSample.EndPoints.Hubs } } + connection.Metadata.Get("redis")?.Dispose(); + return Task.CompletedTask; } @@ -109,15 +133,67 @@ namespace SocketsSample.EndPoints.Hubs } } - private IDisposable Subscribe(string signal, Connection connection) + private IDisposable Subscribe(string channel, Connection connection) { - return _bus.Subscribe(signal, message => + var muxer = connection.Metadata.GetOrAdd("redis", k => { - var invocationAdapter = _registry.GetInvocationAdapter((string)connection.Metadata["formatType"]); + var logger = _loggerFactory.CreateLogger("REDIS_" + connection.ConnectionId); + return ConnectionMultiplexer.Connect("localhost", new LoggerTextWriter(logger)); + }); - return invocationAdapter.WriteInvocationDescriptor((InvocationDescriptor)message, connection.Channel.GetStream()); + var subscriber = muxer.GetSubscriber(); + + subscriber.SubscribeAsync(channel, (c, data) => + { + connection.Channel.Output.WriteAsync((byte[])data); + }); + + return new DisposableAction(() => + { + subscriber.Unsubscribe(channel); }); } - } + public void Dispose() + { + _redis.Dispose(); + } + + private class DisposableAction : IDisposable + { + private Action _action; + + public DisposableAction(Action action) + { + _action = action; + } + + public void Dispose() + { + Interlocked.Exchange(ref _action, () => { }).Invoke(); + } + } + + private class LoggerTextWriter : TextWriter + { + private readonly ILogger _logger; + + public LoggerTextWriter(ILogger logger) + { + _logger = logger; + } + + public override Encoding Encoding => Encoding.UTF8; + + public override void Write(char value) + { + + } + + public override void WriteLine(string value) + { + _logger.LogDebug(value); + } + } + } } diff --git a/samples/SocketsSample/Hubs/PubSub.cs b/samples/SocketsSample/Hubs/PubSub.cs deleted file mode 100644 index 9bbed023b3..0000000000 --- a/samples/SocketsSample/Hubs/PubSub.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace SocketsSample.Hubs -{ - public interface IPubSub - { - IDisposable Subscribe(string topic, Func callback); - Task Publish(string topic, object data); - } - - public class Bus : IPubSub - { - private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); - - public IDisposable Subscribe(string key, Func observer) - { - var subscriptions = _subscriptions.GetOrAdd(key, _ => new List>()); - subscriptions.Add(observer); - - return new DisposableAction(() => - { - subscriptions.Remove(observer); - }); - } - - public async Task Publish(string key, object data) - { - List> subscriptions; - if (_subscriptions.TryGetValue(key, out subscriptions)) - { - foreach (var c in subscriptions) - { - await c(data); - } - } - } - - private class DisposableAction : IDisposable - { - private Action _action; - - public DisposableAction(Action action) - { - _action = action; - } - - public void Dispose() - { - Interlocked.Exchange(ref _action, () => { }).Invoke(); - } - } - } -} diff --git a/samples/SocketsSample/Startup.cs b/samples/SocketsSample/Startup.cs index daef8b93ad..14a32e9843 100644 --- a/samples/SocketsSample/Startup.cs +++ b/samples/SocketsSample/Startup.cs @@ -15,8 +15,8 @@ namespace SocketsSample { services.AddRouting(); - services.AddSingleton(); - services.AddSingleton(typeof(HubLifetimeManager<>), typeof(PubSubHubLifetimeManager<>)); + services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>)); + // services.AddSingleton(typeof(HubLifetimeManager<>), typeof(RedisHubLifetimeManager<>)); services.AddSingleton(typeof(HubEndPoint<>), typeof(HubEndPoint<>)); services.AddSingleton(typeof(RpcEndpoint<>), typeof(RpcEndpoint<>)); diff --git a/samples/SocketsSample/project.json b/samples/SocketsSample/project.json index 4b87fd5135..ba340722f5 100644 --- a/samples/SocketsSample/project.json +++ b/samples/SocketsSample/project.json @@ -8,6 +8,7 @@ "type": "platform" }, "Newtonsoft.Json": "9.0.1", + "StackExchange.Redis": "1.1.*", "Microsoft.AspNetCore.Diagnostics": "1.1.0-*", "Microsoft.AspNetCore.StaticFiles": "1.1.0-*", "Microsoft.AspNetCore.Server.IISIntegration": "1.1.0-*",