using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Channels; namespace Microsoft.AspNetCore.Sockets { public class Message { public ReadableBuffer Payload { get; set; } } public class Bus { private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); public IDisposable Subscribe(string key, Func observer) { var connections = _subscriptions.GetOrAdd(key, _ => new List>()); connections.Add(observer); return new DisposableAction(() => { connections.Remove(observer); }); } public async Task Publish(string key, Message message) { List> connections; if (_subscriptions.TryGetValue(key, out connections)) { foreach (var c in connections) { await c(message); } } } private class DisposableAction : IDisposable { private Action _action; public DisposableAction(Action action) { _action = action; } public void Dispose() { Interlocked.Exchange(ref _action, () => { }).Invoke(); } } } }