diff --git a/SignalR.sln b/SignalR.sln index e69993bdd6..6fd749c6f1 100644 --- a/SignalR.sln +++ b/SignalR.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26823.1 +VisualStudioVersion = 15.0.26919.1 MinimumVisualStudioVersion = 15.0.26730.03 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-5398-4884-87E4-B816698CDE65}" ProjectSection(SolutionItems) = preProject @@ -85,7 +85,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Socket EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmarks", "benchmarks", "{8A4582C8-DC59-4B61-BCE7-119FBAA99EFB}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Client", "src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj", "{BE982591-F4BB-42D9-ABD4-A5D44C65971E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Client", "src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj", "{BE982591-F4BB-42D9-ABD4-A5D44C65971E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Redis.Tests", "test\Microsoft.AspNetCore.SignalR.Redis.Tests\Microsoft.AspNetCore.SignalR.Redis.Tests.csproj", "{0B083AE6-86CA-4E0B-AE02-59154D1FD005}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -189,6 +191,10 @@ Global {BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Debug|Any CPU.Build.0 = Debug|Any CPU {BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Release|Any CPU.ActiveCfg = Release|Any CPU {BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Release|Any CPU.Build.0 = Release|Any CPU + {0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -219,6 +225,7 @@ Global {B0243F99-2D3F-4CC6-AD71-E3F891B64724} = {DA69F624-5398-4884-87E4-B816698CDE65} {E081EE41-D95F-4AD2-BC0B-4B562C0A2A47} = {DA69F624-5398-4884-87E4-B816698CDE65} {BE982591-F4BB-42D9-ABD4-A5D44C65971E} = {DA69F624-5398-4884-87E4-B816698CDE65} + {0B083AE6-86CA-4E0B-AE02-59154D1FD005} = {6A35B453-52EC-48AF-89CA-D4A69800F131} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7945A4E4-ACDB-4F6E-95CA-6AC6E7C2CD59} diff --git a/samples/ChatSample/RedisUserTracker.cs b/samples/ChatSample/RedisUserTracker.cs index 0eee499852..83113a1628 100644 --- a/samples/ChatSample/RedisUserTracker.cs +++ b/samples/ChatSample/RedisUserTracker.cs @@ -28,7 +28,7 @@ namespace ChatSample private const int ScanInterval = 5; //seconds private const int ServerInactivityTimeout = 30; // seconds - private readonly ConnectionMultiplexer _redisConnection; + private readonly IConnectionMultiplexer _redisConnection; private readonly IDatabase _redisDatabase; private readonly ISubscriber _redisSubscriber; @@ -88,7 +88,7 @@ namespace ChatSample }); } - private (ConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options) + private (IConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options) { // TODO: handle connection failures var redisConnection = ConnectToRedis(options, _logger); @@ -101,7 +101,7 @@ namespace ChatSample return (redisConnection, redisDatabase); } - private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger) + private static IConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger) { var loggerTextWriter = new LoggerTextWriter(logger); if (options.Factory != null) diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs index 0d230695e3..9075e4764c 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs @@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis private readonly HubConnectionList _connections = new HubConnectionList(); // TODO: Investigate "memory leak" entries never get removed private readonly ConcurrentDictionary _groups = new ConcurrentDictionary(); - private readonly ConnectionMultiplexer _redisServerConnection; + private readonly IConnectionMultiplexer _redisServerConnection; private readonly ISubscriber _bus; private readonly ILogger _logger; private readonly RedisOptions _options; diff --git a/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs b/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs index d578e4b539..6c25e128b4 100644 --- a/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs +++ b/src/Microsoft.AspNetCore.SignalR.Redis/RedisOptions.cs @@ -12,10 +12,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis { public ConfigurationOptions Options { get; set; } = new ConfigurationOptions(); - public Func Factory { get; set; } + public Func Factory { get; set; } // TODO: Async - internal ConnectionMultiplexer Connect(TextWriter log) + internal IConnectionMultiplexer Connect(TextWriter log) { if (Factory == null) { diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs b/test/Common/TestClient.cs similarity index 100% rename from test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs rename to test/Common/TestClient.cs diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Microsoft.AspNetCore.SignalR.Redis.Tests.csproj b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Microsoft.AspNetCore.SignalR.Redis.Tests.csproj new file mode 100644 index 0000000000..cb7eb147a1 --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Microsoft.AspNetCore.SignalR.Redis.Tests.csproj @@ -0,0 +1,22 @@ + + + + netcoreapp2.0;net461 + netcoreapp2.0 + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs new file mode 100644 index 0000000000..b035ead1e6 --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs @@ -0,0 +1,352 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Threading.Tasks; +using System.Threading.Tasks.Channels; +using Microsoft.AspNetCore.SignalR.Internal.Protocol; +using Microsoft.AspNetCore.SignalR.Tests; +using Microsoft.AspNetCore.SignalR.Tests.Common; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Xunit; + +namespace Microsoft.AspNetCore.SignalR.Redis.Tests +{ + public class RedisHubLifetimeManagerTests + { + [Fact] + public async Task InvokeAllAsyncWritesToAllConnectionsOutput() + { + using (var client1 = new TestClient()) + using (var client2 = new TestClient()) + { + var output1 = Channel.CreateUnbounded(); + var output2 = Channel.CreateUnbounded(); + + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var connection1 = new HubConnectionContext(output1, client1.Connection); + var connection2 = new HubConnectionContext(output2, client2.Connection); + + await manager.OnConnectedAsync(connection1).OrTimeout(); + await manager.OnConnectedAsync(connection2).OrTimeout(); + + await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output1); + AssertMessage(output2); + } + } + + [Fact] + public async Task InvokeAllAsyncDoesNotWriteToDisconnectedConnectionsOutput() + { + using (var client1 = new TestClient()) + using (var client2 = new TestClient()) + { + var output1 = Channel.CreateUnbounded(); + var output2 = Channel.CreateUnbounded(); + + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var connection1 = new HubConnectionContext(output1, client1.Connection); + var connection2 = new HubConnectionContext(output2, client2.Connection); + + await manager.OnConnectedAsync(connection1).OrTimeout(); + await manager.OnConnectedAsync(connection2).OrTimeout(); + + await manager.OnDisconnectedAsync(connection2).OrTimeout(); + + await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output1); + + Assert.False(output2.In.TryRead(out var item)); + } + } + + [Fact] + public async Task InvokeGroupAsyncWritesToAllConnectionsInGroupOutput() + { + using (var client1 = new TestClient()) + using (var client2 = new TestClient()) + { + var output1 = Channel.CreateUnbounded(); + var output2 = Channel.CreateUnbounded(); + + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var connection1 = new HubConnectionContext(output1, client1.Connection); + var connection2 = new HubConnectionContext(output2, client2.Connection); + + await manager.OnConnectedAsync(connection1).OrTimeout(); + await manager.OnConnectedAsync(connection2).OrTimeout(); + + await manager.AddGroupAsync(connection1.ConnectionId, "gunit").OrTimeout(); + + await manager.InvokeGroupAsync("gunit", "Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output1); + + Assert.False(output2.In.TryRead(out var item)); + } + } + + [Fact] + public async Task InvokeConnectionAsyncWritesToConnectionOutput() + { + using (var client = new TestClient()) + { + var output = Channel.CreateUnbounded(); + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var connection = new HubConnectionContext(output, client.Connection); + + await manager.OnConnectedAsync(connection).OrTimeout(); + + await manager.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output); + } + } + + [Fact] + public async Task InvokeConnectionAsyncOnNonExistentConnectionDoesNotThrow() + { + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + await manager.InvokeConnectionAsync("NotARealConnectionId", "Hello", new object[] { "World" }).OrTimeout(); + } + + [Fact] + public async Task InvokeAllAsyncWithMultipleServersWritesToAllConnectionsOutput() + { + var manager1 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var manager2 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client1 = new TestClient()) + using (var client2 = new TestClient()) + { + var output1 = Channel.CreateUnbounded(); + var output2 = Channel.CreateUnbounded(); + + var connection1 = new HubConnectionContext(output1, client1.Connection); + var connection2 = new HubConnectionContext(output2, client2.Connection); + + await manager1.OnConnectedAsync(connection1).OrTimeout(); + await manager2.OnConnectedAsync(connection2).OrTimeout(); + + await manager1.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output1); + AssertMessage(output2); + } + } + + [Fact] + public async Task InvokeAllAsyncWithMultipleServersDoesNotWriteToDisconnectedConnectionsOutput() + { + var manager1 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var manager2 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client1 = new TestClient()) + using (var client2 = new TestClient()) + { + var output1 = Channel.CreateUnbounded(); + var output2 = Channel.CreateUnbounded(); + + var connection1 = new HubConnectionContext(output1, client1.Connection); + var connection2 = new HubConnectionContext(output2, client2.Connection); + + await manager1.OnConnectedAsync(connection1).OrTimeout(); + await manager2.OnConnectedAsync(connection2).OrTimeout(); + + await manager2.OnDisconnectedAsync(connection2).OrTimeout(); + + await manager2.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output1); + + Assert.False(output2.In.TryRead(out var item)); + } + } + + [Fact] + public async Task InvokeConnectionAsyncOnServerWithoutConnectionWritesOutputToConnection() + { + var manager1 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var manager2 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client = new TestClient()) + { + var output = Channel.CreateUnbounded(); + + var connection = new HubConnectionContext(output, client.Connection); + + await manager1.OnConnectedAsync(connection).OrTimeout(); + + await manager2.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output); + } + } + + [Fact] + public async Task InvokeGroupAsyncOnServerWithoutConnectionWritesOutputToGroupConnection() + { + var manager1 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var manager2 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client = new TestClient()) + { + var output = Channel.CreateUnbounded(); + + var connection = new HubConnectionContext(output, client.Connection); + + await manager1.OnConnectedAsync(connection).OrTimeout(); + + await manager1.AddGroupAsync(connection.ConnectionId, "name").OrTimeout(); + + await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout(); + + AssertMessage(output); + } + } + + [Fact] + public async Task DisconnectConnectionRemovesConnectionFromGroup() + { + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client = new TestClient()) + { + var output = Channel.CreateUnbounded(); + + var connection = new HubConnectionContext(output, client.Connection); + + await manager.OnConnectedAsync(connection).OrTimeout(); + + await manager.AddGroupAsync(connection.ConnectionId, "name").OrTimeout(); + + await manager.OnDisconnectedAsync(connection).OrTimeout(); + + await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout(); + + Assert.False(output.In.TryRead(out var item)); + } + } + + [Fact] + public async Task RemoveGroupFromLocalConnectionNotInGroupDoesNothing() + { + var manager = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client = new TestClient()) + { + var output = Channel.CreateUnbounded(); + + var connection = new HubConnectionContext(output, client.Connection); + + await manager.OnConnectedAsync(connection).OrTimeout(); + + await manager.RemoveGroupAsync(connection.ConnectionId, "name").OrTimeout(); + } + } + + [Fact] + public async Task RemoveGroupFromConnectionOnDifferentServerNotInGroupDoesNothing() + { + var manager1 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + var manager2 = new RedisHubLifetimeManager(new LoggerFactory().CreateLogger>(), + Options.Create(new RedisOptions() + { + Factory = t => new TestConnectionMultiplexer() + })); + + using (var client = new TestClient()) + { + var output = Channel.CreateUnbounded(); + + var connection = new HubConnectionContext(output, client.Connection); + + await manager1.OnConnectedAsync(connection).OrTimeout(); + + await manager2.RemoveGroupAsync(connection.ConnectionId, "name").OrTimeout(); + } + } + + private void AssertMessage(Channel channel) + { + Assert.True(channel.In.TryRead(out var item)); + var message = item as InvocationMessage; + Assert.NotNull(message); + Assert.Equal("Hello", message.Target); + Assert.Single(message.Arguments); + Assert.Equal("World", (string)message.Arguments[0]); + } + + private class MyHub : Hub + { + + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/TestConnectionMultiplexer.cs b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/TestConnectionMultiplexer.cs new file mode 100644 index 0000000000..98164b622e --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/TestConnectionMultiplexer.cs @@ -0,0 +1,329 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Threading.Tasks; +using StackExchange.Redis; + +namespace Microsoft.AspNetCore.SignalR.Redis.Tests +{ + public class TestConnectionMultiplexer : IConnectionMultiplexer + { + public string ClientName => throw new NotImplementedException(); + + public string Configuration => throw new NotImplementedException(); + + public int TimeoutMilliseconds => throw new NotImplementedException(); + + public long OperationCount => throw new NotImplementedException(); + + public bool PreserveAsyncOrder { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + public bool IsConnected => true; + + public bool IncludeDetailInExceptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + public int StormLogThreshold { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + public event EventHandler ErrorMessage + { + add { } + remove { } + } + + public event EventHandler ConnectionFailed + { + add { } + remove { } + } + + public event EventHandler InternalError + { + add { } + remove { } + } + + public event EventHandler ConnectionRestored + { + add { } + remove { } + } + + public event EventHandler ConfigurationChanged + { + add { } + remove { } + } + + public event EventHandler ConfigurationChangedBroadcast + { + add { } + remove { } + } + + public event EventHandler HashSlotMoved + { + add { } + remove { } + } + + private ISubscriber _subscriber = new TestSubscriber(); + + public void BeginProfiling(object forContext) + { + throw new NotImplementedException(); + } + + public void Close(bool allowCommandsToComplete = true) + { + throw new NotImplementedException(); + } + + public Task CloseAsync(bool allowCommandsToComplete = true) + { + throw new NotImplementedException(); + } + + public bool Configure(TextWriter log = null) + { + throw new NotImplementedException(); + } + + public Task ConfigureAsync(TextWriter log = null) + { + throw new NotImplementedException(); + } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true) + { + throw new NotImplementedException(); + } + + public ServerCounters GetCounters() + { + throw new NotImplementedException(); + } + + public IDatabase GetDatabase(int db = -1, object asyncState = null) + { + throw new NotImplementedException(); + } + + public EndPoint[] GetEndPoints(bool configuredOnly = false) + { + throw new NotImplementedException(); + } + + public IServer GetServer(string host, int port, object asyncState = null) + { + throw new NotImplementedException(); + } + + public IServer GetServer(string hostAndPort, object asyncState = null) + { + throw new NotImplementedException(); + } + + public IServer GetServer(IPAddress host, int port) + { + throw new NotImplementedException(); + } + + public IServer GetServer(EndPoint endpoint, object asyncState = null) + { + throw new NotImplementedException(); + } + + public string GetStatus() + { + throw new NotImplementedException(); + } + + public void GetStatus(TextWriter log) + { + throw new NotImplementedException(); + } + + public string GetStormLog() + { + throw new NotImplementedException(); + } + + public ISubscriber GetSubscriber(object asyncState = null) + { + return _subscriber; + } + + public int HashSlot(RedisKey key) + { + throw new NotImplementedException(); + } + + public long PublishReconfigure(CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public Task PublishReconfigureAsync(CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public void RegisterProfiler(IProfiler profiler) + { + throw new NotImplementedException(); + } + + public void ResetStormLog() + { + throw new NotImplementedException(); + } + + public void Wait(Task task) + { + throw new NotImplementedException(); + } + + public T Wait(Task task) + { + throw new NotImplementedException(); + } + + public void WaitAll(params Task[] tasks) + { + throw new NotImplementedException(); + } + } + + public class TestSubscriber : ISubscriber + { + // _globalSubscriptions represents the Redis Server you are connected to. + // So when publishing from a TestSubscriber you fake sending through the server by grabbing the callbacks + // from the _globalSubscriptions and inoking them inplace. + private static ConcurrentDictionary>> _globalSubscriptions = + new ConcurrentDictionary>>(); + + private ConcurrentDictionary> _subscriptions = + new ConcurrentDictionary>(); + + public ConnectionMultiplexer Multiplexer => throw new NotImplementedException(); + + public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public bool IsConnected(RedisChannel channel = default) + { + throw new NotImplementedException(); + } + + public TimeSpan Ping(CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public Task PingAsync(CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) + { + if (_globalSubscriptions.TryGetValue(channel, out var handlers)) + { + foreach (var handler in handlers) + { + handler(channel, message); + } + } + + return handlers != null ? handlers.Count : 0; + } + + public async Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) + { + await Task.Yield(); + return Publish(channel, message, flags); + } + + public void Subscribe(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) + { + _globalSubscriptions.AddOrUpdate(channel, _ => new List> { handler }, (_, list) => + { + list.Add(handler); + return list; + }); + _subscriptions.AddOrUpdate(channel, handler, (_, __) => handler); + } + + public Task SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) + { + Subscribe(channel, handler, flags); + return Task.CompletedTask; + } + + public EndPoint SubscribedEndpoint(RedisChannel channel) + { + throw new NotImplementedException(); + } + + public bool TryWait(Task task) + { + throw new NotImplementedException(); + } + + public void Unsubscribe(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) + { + _subscriptions.TryRemove(channel, out var handle); + if (_globalSubscriptions.TryGetValue(channel, out var list)) + { + list.Remove(handle); + } + } + + public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) + { + throw new NotImplementedException(); + } + + public Task UnsubscribeAsync(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) + { + Unsubscribe(channel, handler, flags); + return Task.CompletedTask; + } + + public void Wait(Task task) + { + throw new NotImplementedException(); + } + + public T Wait(Task task) + { + throw new NotImplementedException(); + } + + public void WaitAll(params Task[] tasks) + { + throw new NotImplementedException(); + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/Microsoft.AspNetCore.SignalR.Tests.csproj b/test/Microsoft.AspNetCore.SignalR.Tests/Microsoft.AspNetCore.SignalR.Tests.csproj index 7d3276cb62..23b117da4a 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/Microsoft.AspNetCore.SignalR.Tests.csproj +++ b/test/Microsoft.AspNetCore.SignalR.Tests/Microsoft.AspNetCore.SignalR.Tests.csproj @@ -20,6 +20,7 @@ +