Adding Redis Unit tests (#1013)

This commit is contained in:
BrennanConroy 2017-10-17 15:48:29 -07:00 committed by GitHub
parent 878a70226c
commit 0e70c7950b
9 changed files with 719 additions and 8 deletions

View File

@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26823.1
VisualStudioVersion = 15.0.26919.1
MinimumVisualStudioVersion = 15.0.26730.03
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-5398-4884-87E4-B816698CDE65}"
ProjectSection(SolutionItems) = preProject
@ -85,7 +85,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Socket
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmarks", "benchmarks", "{8A4582C8-DC59-4B61-BCE7-119FBAA99EFB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Client", "src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj", "{BE982591-F4BB-42D9-ABD4-A5D44C65971E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Client", "src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj", "{BE982591-F4BB-42D9-ABD4-A5D44C65971E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Redis.Tests", "test\Microsoft.AspNetCore.SignalR.Redis.Tests\Microsoft.AspNetCore.SignalR.Redis.Tests.csproj", "{0B083AE6-86CA-4E0B-AE02-59154D1FD005}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -189,6 +191,10 @@ Global
{BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Release|Any CPU.Build.0 = Release|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -219,6 +225,7 @@ Global
{B0243F99-2D3F-4CC6-AD71-E3F891B64724} = {DA69F624-5398-4884-87E4-B816698CDE65}
{E081EE41-D95F-4AD2-BC0B-4B562C0A2A47} = {DA69F624-5398-4884-87E4-B816698CDE65}
{BE982591-F4BB-42D9-ABD4-A5D44C65971E} = {DA69F624-5398-4884-87E4-B816698CDE65}
{0B083AE6-86CA-4E0B-AE02-59154D1FD005} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7945A4E4-ACDB-4F6E-95CA-6AC6E7C2CD59}

View File

@ -28,7 +28,7 @@ namespace ChatSample
private const int ScanInterval = 5; //seconds
private const int ServerInactivityTimeout = 30; // seconds
private readonly ConnectionMultiplexer _redisConnection;
private readonly IConnectionMultiplexer _redisConnection;
private readonly IDatabase _redisDatabase;
private readonly ISubscriber _redisSubscriber;
@ -88,7 +88,7 @@ namespace ChatSample
});
}
private (ConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options)
private (IConnectionMultiplexer, IDatabase) StartRedisConnection(RedisOptions options)
{
// TODO: handle connection failures
var redisConnection = ConnectToRedis(options, _logger);
@ -101,7 +101,7 @@ namespace ChatSample
return (redisConnection, redisDatabase);
}
private static ConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger)
private static IConnectionMultiplexer ConnectToRedis(RedisOptions options, ILogger logger)
{
var loggerTextWriter = new LoggerTextWriter(logger);
if (options.Factory != null)

View File

@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
private readonly HubConnectionList _connections = new HubConnectionList();
// TODO: Investigate "memory leak" entries never get removed
private readonly ConcurrentDictionary<string, GroupData> _groups = new ConcurrentDictionary<string, GroupData>();
private readonly ConnectionMultiplexer _redisServerConnection;
private readonly IConnectionMultiplexer _redisServerConnection;
private readonly ISubscriber _bus;
private readonly ILogger _logger;
private readonly RedisOptions _options;

View File

@ -12,10 +12,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
public ConfigurationOptions Options { get; set; } = new ConfigurationOptions();
public Func<TextWriter, ConnectionMultiplexer> Factory { get; set; }
public Func<TextWriter, IConnectionMultiplexer> Factory { get; set; }
// TODO: Async
internal ConnectionMultiplexer Connect(TextWriter log)
internal IConnectionMultiplexer Connect(TextWriter log)
{
if (Factory == null)
{

View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netcoreapp2.0;net461</TargetFrameworks>
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netcoreapp2.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Common\TestClient.cs" Link="TestClient.cs" />
<Compile Include="..\Common\TaskExtensions.cs" Link="TaskExtensions.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.Sockets\Microsoft.AspNetCore.Sockets.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,352 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class RedisHubLifetimeManagerTests
{
[Fact]
public async Task InvokeAllAsyncWritesToAllConnectionsOutput()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
var output1 = Channel.CreateUnbounded<HubMessage>();
var output2 = Channel.CreateUnbounded<HubMessage>();
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var connection1 = new HubConnectionContext(output1, client1.Connection);
var connection2 = new HubConnectionContext(output2, client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output1);
AssertMessage(output2);
}
}
[Fact]
public async Task InvokeAllAsyncDoesNotWriteToDisconnectedConnectionsOutput()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
var output1 = Channel.CreateUnbounded<HubMessage>();
var output2 = Channel.CreateUnbounded<HubMessage>();
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var connection1 = new HubConnectionContext(output1, client1.Connection);
var connection2 = new HubConnectionContext(output2, client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
await manager.OnDisconnectedAsync(connection2).OrTimeout();
await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output1);
Assert.False(output2.In.TryRead(out var item));
}
}
[Fact]
public async Task InvokeGroupAsyncWritesToAllConnectionsInGroupOutput()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
var output1 = Channel.CreateUnbounded<HubMessage>();
var output2 = Channel.CreateUnbounded<HubMessage>();
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var connection1 = new HubConnectionContext(output1, client1.Connection);
var connection2 = new HubConnectionContext(output2, client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();
await manager.AddGroupAsync(connection1.ConnectionId, "gunit").OrTimeout();
await manager.InvokeGroupAsync("gunit", "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output1);
Assert.False(output2.In.TryRead(out var item));
}
}
[Fact]
public async Task InvokeConnectionAsyncWritesToConnectionOutput()
{
using (var client = new TestClient())
{
var output = Channel.CreateUnbounded<HubMessage>();
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var connection = new HubConnectionContext(output, client.Connection);
await manager.OnConnectedAsync(connection).OrTimeout();
await manager.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output);
}
}
[Fact]
public async Task InvokeConnectionAsyncOnNonExistentConnectionDoesNotThrow()
{
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
await manager.InvokeConnectionAsync("NotARealConnectionId", "Hello", new object[] { "World" }).OrTimeout();
}
[Fact]
public async Task InvokeAllAsyncWithMultipleServersWritesToAllConnectionsOutput()
{
var manager1 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var manager2 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
var output1 = Channel.CreateUnbounded<HubMessage>();
var output2 = Channel.CreateUnbounded<HubMessage>();
var connection1 = new HubConnectionContext(output1, client1.Connection);
var connection2 = new HubConnectionContext(output2, client2.Connection);
await manager1.OnConnectedAsync(connection1).OrTimeout();
await manager2.OnConnectedAsync(connection2).OrTimeout();
await manager1.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output1);
AssertMessage(output2);
}
}
[Fact]
public async Task InvokeAllAsyncWithMultipleServersDoesNotWriteToDisconnectedConnectionsOutput()
{
var manager1 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var manager2 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
var output1 = Channel.CreateUnbounded<HubMessage>();
var output2 = Channel.CreateUnbounded<HubMessage>();
var connection1 = new HubConnectionContext(output1, client1.Connection);
var connection2 = new HubConnectionContext(output2, client2.Connection);
await manager1.OnConnectedAsync(connection1).OrTimeout();
await manager2.OnConnectedAsync(connection2).OrTimeout();
await manager2.OnDisconnectedAsync(connection2).OrTimeout();
await manager2.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output1);
Assert.False(output2.In.TryRead(out var item));
}
}
[Fact]
public async Task InvokeConnectionAsyncOnServerWithoutConnectionWritesOutputToConnection()
{
var manager1 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var manager2 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client = new TestClient())
{
var output = Channel.CreateUnbounded<HubMessage>();
var connection = new HubConnectionContext(output, client.Connection);
await manager1.OnConnectedAsync(connection).OrTimeout();
await manager2.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output);
}
}
[Fact]
public async Task InvokeGroupAsyncOnServerWithoutConnectionWritesOutputToGroupConnection()
{
var manager1 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var manager2 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client = new TestClient())
{
var output = Channel.CreateUnbounded<HubMessage>();
var connection = new HubConnectionContext(output, client.Connection);
await manager1.OnConnectedAsync(connection).OrTimeout();
await manager1.AddGroupAsync(connection.ConnectionId, "name").OrTimeout();
await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output);
}
}
[Fact]
public async Task DisconnectConnectionRemovesConnectionFromGroup()
{
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client = new TestClient())
{
var output = Channel.CreateUnbounded<HubMessage>();
var connection = new HubConnectionContext(output, client.Connection);
await manager.OnConnectedAsync(connection).OrTimeout();
await manager.AddGroupAsync(connection.ConnectionId, "name").OrTimeout();
await manager.OnDisconnectedAsync(connection).OrTimeout();
await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
Assert.False(output.In.TryRead(out var item));
}
}
[Fact]
public async Task RemoveGroupFromLocalConnectionNotInGroupDoesNothing()
{
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client = new TestClient())
{
var output = Channel.CreateUnbounded<HubMessage>();
var connection = new HubConnectionContext(output, client.Connection);
await manager.OnConnectedAsync(connection).OrTimeout();
await manager.RemoveGroupAsync(connection.ConnectionId, "name").OrTimeout();
}
}
[Fact]
public async Task RemoveGroupFromConnectionOnDifferentServerNotInGroupDoesNothing()
{
var manager1 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var manager2 = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
using (var client = new TestClient())
{
var output = Channel.CreateUnbounded<HubMessage>();
var connection = new HubConnectionContext(output, client.Connection);
await manager1.OnConnectedAsync(connection).OrTimeout();
await manager2.RemoveGroupAsync(connection.ConnectionId, "name").OrTimeout();
}
}
private void AssertMessage(Channel<HubMessage> channel)
{
Assert.True(channel.In.TryRead(out var item));
var message = item as InvocationMessage;
Assert.NotNull(message);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
}
private class MyHub : Hub
{
}
}
}

View File

@ -0,0 +1,329 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class TestConnectionMultiplexer : IConnectionMultiplexer
{
public string ClientName => throw new NotImplementedException();
public string Configuration => throw new NotImplementedException();
public int TimeoutMilliseconds => throw new NotImplementedException();
public long OperationCount => throw new NotImplementedException();
public bool PreserveAsyncOrder { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public bool IsConnected => true;
public bool IncludeDetailInExceptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public int StormLogThreshold { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public event EventHandler<RedisErrorEventArgs> ErrorMessage
{
add { }
remove { }
}
public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed
{
add { }
remove { }
}
public event EventHandler<InternalErrorEventArgs> InternalError
{
add { }
remove { }
}
public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored
{
add { }
remove { }
}
public event EventHandler<EndPointEventArgs> ConfigurationChanged
{
add { }
remove { }
}
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast
{
add { }
remove { }
}
public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
{
add { }
remove { }
}
private ISubscriber _subscriber = new TestSubscriber();
public void BeginProfiling(object forContext)
{
throw new NotImplementedException();
}
public void Close(bool allowCommandsToComplete = true)
{
throw new NotImplementedException();
}
public Task CloseAsync(bool allowCommandsToComplete = true)
{
throw new NotImplementedException();
}
public bool Configure(TextWriter log = null)
{
throw new NotImplementedException();
}
public Task<bool> ConfigureAsync(TextWriter log = null)
{
throw new NotImplementedException();
}
public void Dispose()
{
throw new NotImplementedException();
}
public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
{
throw new NotImplementedException();
}
public ServerCounters GetCounters()
{
throw new NotImplementedException();
}
public IDatabase GetDatabase(int db = -1, object asyncState = null)
{
throw new NotImplementedException();
}
public EndPoint[] GetEndPoints(bool configuredOnly = false)
{
throw new NotImplementedException();
}
public IServer GetServer(string host, int port, object asyncState = null)
{
throw new NotImplementedException();
}
public IServer GetServer(string hostAndPort, object asyncState = null)
{
throw new NotImplementedException();
}
public IServer GetServer(IPAddress host, int port)
{
throw new NotImplementedException();
}
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
throw new NotImplementedException();
}
public string GetStatus()
{
throw new NotImplementedException();
}
public void GetStatus(TextWriter log)
{
throw new NotImplementedException();
}
public string GetStormLog()
{
throw new NotImplementedException();
}
public ISubscriber GetSubscriber(object asyncState = null)
{
return _subscriber;
}
public int HashSlot(RedisKey key)
{
throw new NotImplementedException();
}
public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public void RegisterProfiler(IProfiler profiler)
{
throw new NotImplementedException();
}
public void ResetStormLog()
{
throw new NotImplementedException();
}
public void Wait(Task task)
{
throw new NotImplementedException();
}
public T Wait<T>(Task<T> task)
{
throw new NotImplementedException();
}
public void WaitAll(params Task[] tasks)
{
throw new NotImplementedException();
}
}
public class TestSubscriber : ISubscriber
{
// _globalSubscriptions represents the Redis Server you are connected to.
// So when publishing from a TestSubscriber you fake sending through the server by grabbing the callbacks
// from the _globalSubscriptions and inoking them inplace.
private static ConcurrentDictionary<RedisChannel, List<Action<RedisChannel, RedisValue>>> _globalSubscriptions =
new ConcurrentDictionary<RedisChannel, List<Action<RedisChannel, RedisValue>>>();
private ConcurrentDictionary<RedisChannel, Action<RedisChannel, RedisValue>> _subscriptions =
new ConcurrentDictionary<RedisChannel, Action<RedisChannel, RedisValue>>();
public ConnectionMultiplexer Multiplexer => throw new NotImplementedException();
public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public bool IsConnected(RedisChannel channel = default)
{
throw new NotImplementedException();
}
public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (_globalSubscriptions.TryGetValue(channel, out var handlers))
{
foreach (var handler in handlers)
{
handler(channel, message);
}
}
return handlers != null ? handlers.Count : 0;
}
public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
await Task.Yield();
return Publish(channel, message, flags);
}
public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
_globalSubscriptions.AddOrUpdate(channel, _ => new List<Action<RedisChannel, RedisValue>> { handler }, (_, list) =>
{
list.Add(handler);
return list;
});
_subscriptions.AddOrUpdate(channel, handler, (_, __) => handler);
}
public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
Subscribe(channel, handler, flags);
return Task.CompletedTask;
}
public EndPoint SubscribedEndpoint(RedisChannel channel)
{
throw new NotImplementedException();
}
public bool TryWait(Task task)
{
throw new NotImplementedException();
}
public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
_subscriptions.TryRemove(channel, out var handle);
if (_globalSubscriptions.TryGetValue(channel, out var list))
{
list.Remove(handle);
}
}
public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
Unsubscribe(channel, handler, flags);
return Task.CompletedTask;
}
public void Wait(Task task)
{
throw new NotImplementedException();
}
public T Wait<T>(Task<T> task)
{
throw new NotImplementedException();
}
public void WaitAll(params Task[] tasks)
{
throw new NotImplementedException();
}
}
}

View File

@ -20,6 +20,7 @@
<Compile Include="..\Common\ServerFixture.cs" Link="ServerFixture.cs" />
<Compile Include="..\Common\TaskExtensions.cs" Link="TaskExtensions.cs" />
<Compile Include="..\Common\TestHelpers.cs" Link="TestHelpers.cs" />
<Compile Include="..\Common\TestClient.cs" Link="TestClient.cs" />
</ItemGroup>
<ItemGroup>