Get rid of pub sub abstraction and add redis impl

This commit is contained in:
David Fowler 2016-11-02 01:21:51 -07:00
parent 4221db3890
commit 915ebbda6b
4 changed files with 94 additions and 74 deletions

View File

@ -1,22 +1,30 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
using SocketsSample.Hubs;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace SocketsSample.EndPoints.Hubs
{
public class PubSubHubLifetimeManager<THub> : HubLifetimeManager<THub>
public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposable
{
private readonly IPubSub _bus;
private readonly InvocationAdapterRegistry _registry;
private readonly ConnectionMultiplexer _redis;
private readonly ISubscriber _bus;
private readonly ILoggerFactory _loggerFactory;
public PubSubHubLifetimeManager(IPubSub bus, InvocationAdapterRegistry registry)
public RedisHubLifetimeManager(InvocationAdapterRegistry registry, ILoggerFactory loggerFactory)
{
_bus = bus;
var writer = new LoggerTextWriter(loggerFactory.CreateLogger<RedisHubLifetimeManager<THub>>());
_loggerFactory = loggerFactory;
_redis = ConnectionMultiplexer.Connect("localhost", writer);
_bus = _redis.GetSubscriber();
_registry = registry;
}
@ -28,7 +36,7 @@ namespace SocketsSample.EndPoints.Hubs
Arguments = args
};
return _bus.Publish(typeof(THub).Name, message);
return PublishAsync(typeof(THub).Name, message);
}
public override Task InvokeConnection(string connectionId, string methodName, params object[] args)
@ -39,7 +47,7 @@ namespace SocketsSample.EndPoints.Hubs
Arguments = args
};
return _bus.Publish(typeof(THub) + "." + connectionId, message);
return PublishAsync(typeof(THub) + "." + connectionId, message);
}
public override Task InvokeGroup(string groupName, string methodName, params object[] args)
@ -50,7 +58,7 @@ namespace SocketsSample.EndPoints.Hubs
Arguments = args
};
return _bus.Publish(typeof(THub) + "." + groupName, message);
return PublishAsync(typeof(THub) + "." + groupName, message);
}
public override Task InvokeUser(string userId, string methodName, params object[] args)
@ -61,7 +69,21 @@ namespace SocketsSample.EndPoints.Hubs
Arguments = args
};
return _bus.Publish(typeof(THub) + "." + userId, message);
return PublishAsync(typeof(THub) + "." + userId, message);
}
private Task PublishAsync(string channel, InvocationDescriptor message)
{
// TODO: What format??
var invocationAdapter = _registry.GetInvocationAdapter("json");
// BAD
using (var ms = new MemoryStream())
{
invocationAdapter.WriteInvocationDescriptor(message, ms);
return _bus.PublishAsync(channel, ms.ToArray());
}
}
public override Task OnConnectedAsync(Connection connection)
@ -87,6 +109,8 @@ namespace SocketsSample.EndPoints.Hubs
}
}
connection.Metadata.Get<ConnectionMultiplexer>("redis")?.Dispose();
return Task.CompletedTask;
}
@ -109,15 +133,67 @@ namespace SocketsSample.EndPoints.Hubs
}
}
private IDisposable Subscribe(string signal, Connection connection)
private IDisposable Subscribe(string channel, Connection connection)
{
return _bus.Subscribe(signal, message =>
var muxer = connection.Metadata.GetOrAdd("redis", k =>
{
var invocationAdapter = _registry.GetInvocationAdapter((string)connection.Metadata["formatType"]);
var logger = _loggerFactory.CreateLogger("REDIS_" + connection.ConnectionId);
return ConnectionMultiplexer.Connect("localhost", new LoggerTextWriter(logger));
});
return invocationAdapter.WriteInvocationDescriptor((InvocationDescriptor)message, connection.Channel.GetStream());
var subscriber = muxer.GetSubscriber();
subscriber.SubscribeAsync(channel, (c, data) =>
{
connection.Channel.Output.WriteAsync((byte[])data);
});
return new DisposableAction(() =>
{
subscriber.Unsubscribe(channel);
});
}
}
public void Dispose()
{
_redis.Dispose();
}
private class DisposableAction : IDisposable
{
private Action _action;
public DisposableAction(Action action)
{
_action = action;
}
public void Dispose()
{
Interlocked.Exchange(ref _action, () => { }).Invoke();
}
}
private class LoggerTextWriter : TextWriter
{
private readonly ILogger _logger;
public LoggerTextWriter(ILogger logger)
{
_logger = logger;
}
public override Encoding Encoding => Encoding.UTF8;
public override void Write(char value)
{
}
public override void WriteLine(string value)
{
_logger.LogDebug(value);
}
}
}
}

View File

@ -1,57 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace SocketsSample.Hubs
{
public interface IPubSub
{
IDisposable Subscribe(string topic, Func<object, Task> callback);
Task Publish(string topic, object data);
}
public class Bus : IPubSub
{
private readonly ConcurrentDictionary<string, List<Func<object, Task>>> _subscriptions = new ConcurrentDictionary<string, List<Func<object, Task>>>();
public IDisposable Subscribe(string key, Func<object, Task> observer)
{
var subscriptions = _subscriptions.GetOrAdd(key, _ => new List<Func<object, Task>>());
subscriptions.Add(observer);
return new DisposableAction(() =>
{
subscriptions.Remove(observer);
});
}
public async Task Publish(string key, object data)
{
List<Func<object, Task>> subscriptions;
if (_subscriptions.TryGetValue(key, out subscriptions))
{
foreach (var c in subscriptions)
{
await c(data);
}
}
}
private class DisposableAction : IDisposable
{
private Action _action;
public DisposableAction(Action action)
{
_action = action;
}
public void Dispose()
{
Interlocked.Exchange(ref _action, () => { }).Invoke();
}
}
}
}

View File

@ -15,8 +15,8 @@ namespace SocketsSample
{
services.AddRouting();
services.AddSingleton<IPubSub, Bus>();
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(PubSubHubLifetimeManager<>));
services.AddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
// services.AddSingleton(typeof(HubLifetimeManager<>), typeof(RedisHubLifetimeManager<>));
services.AddSingleton(typeof(HubEndPoint<>), typeof(HubEndPoint<>));
services.AddSingleton(typeof(RpcEndpoint<>), typeof(RpcEndpoint<>));

View File

@ -8,6 +8,7 @@
"type": "platform"
},
"Newtonsoft.Json": "9.0.1",
"StackExchange.Redis": "1.1.*",
"Microsoft.AspNetCore.Diagnostics": "1.1.0-*",
"Microsoft.AspNetCore.StaticFiles": "1.1.0-*",
"Microsoft.AspNetCore.Server.IISIntegration": "1.1.0-*",