Remove SignalR.Redis package (#3241)

This commit is contained in:
BrennanConroy 2018-10-31 15:16:44 -07:00 committed by GitHub
parent 867c315a56
commit cd4a1c036a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 5 additions and 3318 deletions

View File

@ -33,8 +33,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.Http.C
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Core", "src\Microsoft.AspNetCore.SignalR.Core\Microsoft.AspNetCore.SignalR.Core.csproj", "{42E76F87-92B6-45AB-BF07-6B811C0F2CAC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Redis", "src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj", "{59319B72-38BE-4041-8E5C-FF6938874CE8}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SocialWeather", "samples\SocialWeather\SocialWeather.csproj", "{8D789F94-CB74-45FD-ACE7-92AF6E55042E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Tests", "test\Microsoft.AspNetCore.SignalR.Tests\Microsoft.AspNetCore.SignalR.Tests.csproj", "{1CE2B3BE-056C-41E3-A5F5-6A1EF1D288BA}"
@ -69,8 +67,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmarks", "benchmarks",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Client", "src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj", "{BE982591-F4BB-42D9-ABD4-A5D44C65971E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Redis.Tests", "test\Microsoft.AspNetCore.SignalR.Redis.Tests\Microsoft.AspNetCore.SignalR.Redis.Tests.csproj", "{0B083AE6-86CA-4E0B-AE02-59154D1FD005}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtSample", "samples\JwtSample\JwtSample.csproj", "{6A7491D3-3C97-49BD-A71C-433AED657F30}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtClientSample", "samples\JwtClientSample\JwtClientSample.csproj", "{1A953296-E869-4DE2-A693-FD5FCDE27057}"
@ -113,10 +109,6 @@ Global
{42E76F87-92B6-45AB-BF07-6B811C0F2CAC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{42E76F87-92B6-45AB-BF07-6B811C0F2CAC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{42E76F87-92B6-45AB-BF07-6B811C0F2CAC}.Release|Any CPU.Build.0 = Release|Any CPU
{59319B72-38BE-4041-8E5C-FF6938874CE8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{59319B72-38BE-4041-8E5C-FF6938874CE8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{59319B72-38BE-4041-8E5C-FF6938874CE8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{59319B72-38BE-4041-8E5C-FF6938874CE8}.Release|Any CPU.Build.0 = Release|Any CPU
{8D789F94-CB74-45FD-ACE7-92AF6E55042E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8D789F94-CB74-45FD-ACE7-92AF6E55042E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8D789F94-CB74-45FD-ACE7-92AF6E55042E}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -177,10 +169,6 @@ Global
{BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BE982591-F4BB-42D9-ABD4-A5D44C65971E}.Release|Any CPU.Build.0 = Release|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0B083AE6-86CA-4E0B-AE02-59154D1FD005}.Release|Any CPU.Build.0 = Release|Any CPU
{6A7491D3-3C97-49BD-A71C-433AED657F30}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6A7491D3-3C97-49BD-A71C-433AED657F30}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6A7491D3-3C97-49BD-A71C-433AED657F30}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -233,7 +221,6 @@ Global
{C4AEAB04-F341-4539-B6C0-52368FB4BF9E} = {C4BC9889-B49F-41B6-806B-F84941B2549B}
{AAD719D5-5E31-4ED1-A60F-6EB92EFA66D9} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
{42E76F87-92B6-45AB-BF07-6B811C0F2CAC} = {DA69F624-5398-4884-87E4-B816698CDE65}
{59319B72-38BE-4041-8E5C-FF6938874CE8} = {DA69F624-5398-4884-87E4-B816698CDE65}
{8D789F94-CB74-45FD-ACE7-92AF6E55042E} = {C4BC9889-B49F-41B6-806B-F84941B2549B}
{1CE2B3BE-056C-41E3-A5F5-6A1EF1D288BA} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
{BA99C2A1-48F9-4FA5-B95A-9687A73B7CC9} = {C4BC9889-B49F-41B6-806B-F84941B2549B}
@ -249,7 +236,6 @@ Global
{B0243F99-2D3F-4CC6-AD71-E3F891B64724} = {DA69F624-5398-4884-87E4-B816698CDE65}
{E081EE41-D95F-4AD2-BC0B-4B562C0A2A47} = {DA69F624-5398-4884-87E4-B816698CDE65}
{BE982591-F4BB-42D9-ABD4-A5D44C65971E} = {DA69F624-5398-4884-87E4-B816698CDE65}
{0B083AE6-86CA-4E0B-AE02-59154D1FD005} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
{6A7491D3-3C97-49BD-A71C-433AED657F30} = {C4BC9889-B49F-41B6-806B-F84941B2549B}
{1A953296-E869-4DE2-A693-FD5FCDE27057} = {C4BC9889-B49F-41B6-806B-F84941B2549B}
{0A0A6135-EA24-4307-95C2-CE1B7E164A5E} = {6A35B453-52EC-48AF-89CA-D4A69800F131}

View File

@ -14,7 +14,7 @@
<ItemGroup Condition="'$(BenchmarksTargetFramework)' == ''">
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Protocols.MessagePack\Microsoft.AspNetCore.SignalR.Protocols.MessagePack.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.StackExchangeRedis\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="$(MicrosoftAspNetCoreServerKestrelPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="$(MicrosoftExtensionsConfigurationCommandLinePackageVersion)" />

View File

@ -28,7 +28,7 @@ namespace BenchmarkServer
var redisConnectionString = _config["SignalRRedis"];
if (!string.IsNullOrEmpty(redisConnectionString))
{
signalrBuilder.AddRedis(redisConnectionString);
signalrBuilder.AddStackExchangeRedis(redisConnectionString);
}
}

View File

@ -62,7 +62,6 @@
<NETStandardLibrary20PackageVersion>2.0.3</NETStandardLibrary20PackageVersion>
<NewtonsoftJsonPackageVersion>11.0.2</NewtonsoftJsonPackageVersion>
<StackExchangeRedisPackageVersion>2.0.513</StackExchangeRedisPackageVersion>
<StackExchangeRedisStrongNamePackageVersion>1.2.6</StackExchangeRedisStrongNamePackageVersion>
<SystemBuffersPackageVersion>4.6.0-preview1-26907-04</SystemBuffersPackageVersion>
<SystemIOPipelinesPackageVersion>4.6.0-preview1-26907-04</SystemIOPipelinesPackageVersion>
<SystemMemoryPackageVersion>4.6.0-preview1-26717-04</SystemMemoryPackageVersion>

View File

@ -10,7 +10,7 @@
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Protocols.MessagePack\Microsoft.AspNetCore.SignalR.Protocols.MessagePack.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.Http.Connections\Microsoft.AspNetCore.Http.Connections.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.StackExchangeRedis\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics" Version="$(MicrosoftAspNetCoreDiagnosticsPackageVersion)" />
@ -21,8 +21,8 @@
<PackageReference Include="Microsoft.AspNetCore.Cors" Version="$(MicrosoftAspNetCoreCorsPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="$(MicrosoftExtensionsLoggingConsolePackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="$(MicrosoftExtensionsConfigurationCommandLinePackageVersion)" />
<PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufPackageVersion)" />
<PackageReference Include="System.Reactive.Linq" Version="$(SystemReactiveLinqPackageVersion)" />
<PackageReference Include="Microsoft.CSharp" Version="$(MicrosoftCSharpPackageVersion)" />
</ItemGroup>
<Target Name="CopyTSClient" BeforeTargets="AfterBuild">

View File

@ -29,7 +29,7 @@ namespace SignalRSamples
options.KeepAliveInterval = TimeSpan.FromSeconds(5);
})
.AddMessagePackProtocol();
//.AddRedis();
//.AddStackExchangeRedis();
services.AddCors(o =>
{

View File

@ -1,117 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
internal class AckHandler : IDisposable
{
private readonly ConcurrentDictionary<int, AckInfo> _acks = new ConcurrentDictionary<int, AckInfo>();
private readonly Timer _timer;
private readonly TimeSpan _ackThreshold = TimeSpan.FromSeconds(30);
private readonly TimeSpan _ackInterval = TimeSpan.FromSeconds(5);
private readonly object _lock = new object();
private bool _disposed;
public AckHandler()
{
// Don't capture the current ExecutionContext and its AsyncLocals onto the timer
bool restoreFlow = false;
try
{
if (!ExecutionContext.IsFlowSuppressed())
{
ExecutionContext.SuppressFlow();
restoreFlow = true;
}
_timer = new Timer(state => ((AckHandler)state).CheckAcks(), state: this, dueTime: _ackInterval, period: _ackInterval);
}
finally
{
// Restore the current ExecutionContext
if (restoreFlow)
{
ExecutionContext.RestoreFlow();
}
}
}
public Task CreateAck(int id)
{
lock (_lock)
{
if (_disposed)
{
return Task.CompletedTask;
}
return _acks.GetOrAdd(id, _ => new AckInfo()).Tcs.Task;
}
}
public void TriggerAck(int id)
{
if (_acks.TryRemove(id, out var ack))
{
ack.Tcs.TrySetResult(null);
}
}
private void CheckAcks()
{
if (_disposed)
{
return;
}
var utcNow = DateTime.UtcNow;
foreach (var pair in _acks)
{
var elapsed = utcNow - pair.Value.Created;
if (elapsed > _ackThreshold)
{
if (_acks.TryRemove(pair.Key, out var ack))
{
ack.Tcs.TrySetCanceled();
}
}
}
}
public void Dispose()
{
lock (_lock)
{
_disposed = true;
_timer.Dispose();
foreach (var pair in _acks)
{
if (_acks.TryRemove(pair.Key, out var ack))
{
ack.Tcs.TrySetCanceled();
}
}
}
}
private class AckInfo
{
public TaskCompletionSource<object> Tcs { get; private set; }
public DateTime Created { get; private set; }
public AckInfo()
{
Created = DateTime.UtcNow;
Tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
}
}

View File

@ -1,15 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
// The size of the enum is defined by the protocol. Do not change it. If you need more than 255 items,
// add an additional enum.
public enum GroupAction : byte
{
// These numbers are used by the protocol, do not change them and always use explicit assignment
// when adding new items to this enum. 0 is intentionally omitted
Add = 1,
Remove = 2,
}
}

View File

@ -1,68 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using MessagePack;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
internal static class MessagePackUtil
{
public static int ReadArrayHeader(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadArrayHeader(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static int ReadMapHeader(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadMapHeader(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static string ReadString(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadString(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static byte[] ReadBytes(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadBytes(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static int ReadInt32(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadInt32(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static byte ReadByte(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadByte(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
private static ArraySegment<byte> GetArray(ReadOnlyMemory<byte> data)
{
var isArray = MemoryMarshal.TryGetArray(data, out var array);
Debug.Assert(isArray);
return array;
}
}
}

View File

@ -1,75 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Runtime.CompilerServices;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
internal class RedisChannels
{
private readonly string _prefix;
/// <summary>
/// Gets the name of the channel for sending to all connections.
/// </summary>
/// <remarks>
/// The payload on this channel is <see cref="RedisInvocation"/> objects containing
/// invocations to be sent to all connections
/// </remarks>
public string All { get; }
/// <summary>
/// Gets the name of the internal channel for group management messages.
/// </summary>
public string GroupManagement { get; }
public RedisChannels(string prefix)
{
_prefix = prefix;
All = prefix + ":all";
GroupManagement = prefix + ":internal:groups";
}
/// <summary>
/// Gets the name of the channel for sending a message to a specific connection.
/// </summary>
/// <param name="connectionId">The ID of the connection to get the channel for.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string Connection(string connectionId)
{
return _prefix + ":connection:" + connectionId;
}
/// <summary>
/// Gets the name of the channel for sending a message to a named group of connections.
/// </summary>
/// <param name="groupName">The name of the group to get the channel for.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string Group(string groupName)
{
return _prefix + ":group:" + groupName;
}
/// <summary>
/// Gets the name of the channel for sending a message to all collections associated with a user.
/// </summary>
/// <param name="userId">The ID of the user to get the channel for.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string User(string userId)
{
return _prefix + ":user:" + userId;
}
/// <summary>
/// Gets the name of the acknowledgement channel for the specified server.
/// </summary>
/// <param name="serverName">The name of the server to get the acknowledgement channel for.</param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string Ack(string serverName)
{
return _prefix + ":internal:ack:" + serverName;
}
}
}

View File

@ -1,42 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
public readonly struct RedisGroupCommand
{
/// <summary>
/// Gets the ID of the group command.
/// </summary>
public int Id { get; }
/// <summary>
/// Gets the name of the server that sent the command.
/// </summary>
public string ServerName { get; }
/// <summary>
/// Gets the action to be performed on the group.
/// </summary>
public GroupAction Action { get; }
/// <summary>
/// Gets the group on which the action is performed.
/// </summary>
public string GroupName { get; }
/// <summary>
/// Gets the ID of the connection to be added or removed from the group.
/// </summary>
public string ConnectionId { get; }
public RedisGroupCommand(int id, string serverName, GroupAction action, string groupName, string connectionId)
{
Id = id;
ServerName = serverName;
Action = action;
GroupName = groupName;
ConnectionId = connectionId;
}
}
}

View File

@ -1,35 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
public readonly struct RedisInvocation
{
/// <summary>
/// Gets a list of connections that should be excluded from this invocation.
/// May be null to indicate that no connections are to be excluded.
/// </summary>
public IReadOnlyList<string> ExcludedConnectionIds { get; }
/// <summary>
/// Gets the message serialization cache containing serialized payloads for the message.
/// </summary>
public SerializedHubMessage Message { get; }
public RedisInvocation(SerializedHubMessage message, IReadOnlyList<string> excludedConnectionIds)
{
Message = message;
ExcludedConnectionIds = excludedConnectionIds;
}
public static RedisInvocation Create(string target, object[] arguments, IReadOnlyList<string> excludedConnectionIds = null)
{
return new RedisInvocation(
new SerializedHubMessage(new InvocationMessage(target, null, arguments)),
excludedConnectionIds);
}
}
}

View File

@ -1,119 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
// We don't want to use our nested static class here because RedisHubLifetimeManager is generic.
// We'd end up creating separate instances of all the LoggerMessage.Define values for each Hub.
internal static class RedisLog
{
private static readonly Action<ILogger, string, string, Exception> _connectingToEndpoints =
LoggerMessage.Define<string, string>(LogLevel.Information, new EventId(1, "ConnectingToEndpoints"), "Connecting to Redis endpoints: {Endpoints}. Using Server Name: {ServerName}");
private static readonly Action<ILogger, Exception> _connected =
LoggerMessage.Define(LogLevel.Information, new EventId(2, "Connected"), "Connected to Redis.");
private static readonly Action<ILogger, string, Exception> _subscribing =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(3, "Subscribing"), "Subscribing to channel: {Channel}.");
private static readonly Action<ILogger, string, Exception> _receivedFromChannel =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(4, "ReceivedFromChannel"), "Received message from Redis channel {Channel}.");
private static readonly Action<ILogger, string, Exception> _publishToChannel =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(5, "PublishToChannel"), "Publishing message to Redis channel {Channel}.");
private static readonly Action<ILogger, string, Exception> _unsubscribe =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(6, "Unsubscribe"), "Unsubscribing from channel: {Channel}.");
private static readonly Action<ILogger, Exception> _notConnected =
LoggerMessage.Define(LogLevel.Error, new EventId(7, "Connected"), "Not connected to Redis.");
private static readonly Action<ILogger, Exception> _connectionRestored =
LoggerMessage.Define(LogLevel.Information, new EventId(8, "ConnectionRestored"), "Connection to Redis restored.");
private static readonly Action<ILogger, Exception> _connectionFailed =
LoggerMessage.Define(LogLevel.Error, new EventId(9, "ConnectionFailed"), "Connection to Redis failed.");
private static readonly Action<ILogger, Exception> _failedWritingMessage =
LoggerMessage.Define(LogLevel.Warning, new EventId(10, "FailedWritingMessage"), "Failed writing message.");
private static readonly Action<ILogger, Exception> _internalMessageFailed =
LoggerMessage.Define(LogLevel.Warning, new EventId(11, "InternalMessageFailed"), "Error processing message for internal server message.");
public static void ConnectingToEndpoints(ILogger logger, EndPointCollection endpoints, string serverName)
{
if (logger.IsEnabled(LogLevel.Information))
{
if (endpoints.Count > 0)
{
_connectingToEndpoints(logger, string.Join(", ", endpoints.Select(e => EndPointCollection.ToString(e))), serverName, null);
}
}
}
public static void Connected(ILogger logger)
{
_connected(logger, null);
}
public static void Subscribing(ILogger logger, string channelName)
{
_subscribing(logger, channelName, null);
}
public static void ReceivedFromChannel(ILogger logger, string channelName)
{
_receivedFromChannel(logger, channelName, null);
}
public static void PublishToChannel(ILogger logger, string channelName)
{
_publishToChannel(logger, channelName, null);
}
public static void Unsubscribe(ILogger logger, string channelName)
{
_unsubscribe(logger, channelName, null);
}
public static void NotConnected(ILogger logger)
{
_notConnected(logger, null);
}
public static void ConnectionRestored(ILogger logger)
{
_connectionRestored(logger, null);
}
public static void ConnectionFailed(ILogger logger, Exception exception)
{
_connectionFailed(logger, exception);
}
public static void FailedWritingMessage(ILogger logger, Exception exception)
{
_failedWritingMessage(logger, exception);
}
public static void InternalMessageFailed(ILogger logger, Exception exception)
{
_internalMessageFailed(logger, exception);
}
// This isn't DefineMessage-based because it's just the simple TextWriter logging from ConnectionMultiplexer
public static void ConnectionMultiplexerMessage(ILogger logger, string message)
{
if (logger.IsEnabled(LogLevel.Debug))
{
// We tag it with EventId 100 though so it can be pulled out of logs easily.
logger.LogDebug(new EventId(100, "RedisConnectionLog"), message);
}
}
}
}

View File

@ -1,208 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using MessagePack;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
public class RedisProtocol
{
private readonly IReadOnlyList<IHubProtocol> _protocols;
public RedisProtocol(IReadOnlyList<IHubProtocol> protocols)
{
_protocols = protocols;
}
// The Redis Protocol:
// * The message type is known in advance because messages are sent to different channels based on type
// * Invocations are sent to the All, Group, Connection and User channels
// * Group Commands are sent to the GroupManagement channel
// * Acks are sent to the Acknowledgement channel.
// * See the Write[type] methods for a description of the protocol for each in-depth.
// * The "Variable length integer" is the length-prefixing format used by BinaryReader/BinaryWriter:
// * https://docs.microsoft.com/en-us/dotnet/api/system.io.binarywriter.write?view=netstandard-2.0
// * The "Length prefixed string" is the string format used by BinaryReader/BinaryWriter:
// * A 7-bit variable length integer encodes the length in bytes, followed by the encoded string in UTF-8.
public byte[] WriteInvocation(string methodName, object[] args) =>
WriteInvocation(methodName, args, excludedConnectionIds: null);
public byte[] WriteInvocation(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
{
// Written as a MessagePack 'arr' containing at least these items:
// * A MessagePack 'arr' of 'str's representing the excluded ids
// * [The output of WriteSerializedHubMessage, which is an 'arr']
// Any additional items are discarded.
var writer = MemoryBufferWriter.Get();
try
{
MessagePackBinary.WriteArrayHeader(writer, 2);
if (excludedConnectionIds != null && excludedConnectionIds.Count > 0)
{
MessagePackBinary.WriteArrayHeader(writer, excludedConnectionIds.Count);
foreach (var id in excludedConnectionIds)
{
MessagePackBinary.WriteString(writer, id);
}
}
else
{
MessagePackBinary.WriteArrayHeader(writer, 0);
}
WriteSerializedHubMessage(writer,
new SerializedHubMessage(new InvocationMessage(methodName, args)));
return writer.ToArray();
}
finally
{
MemoryBufferWriter.Return(writer);
}
}
public byte[] WriteGroupCommand(RedisGroupCommand command)
{
// Written as a MessagePack 'arr' containing at least these items:
// * An 'int': the Id of the command
// * A 'str': The server name
// * An 'int': The action (likely less than 0x7F and thus a single-byte fixnum)
// * A 'str': The group name
// * A 'str': The connection Id
// Any additional items are discarded.
var writer = MemoryBufferWriter.Get();
try
{
MessagePackBinary.WriteArrayHeader(writer, 5);
MessagePackBinary.WriteInt32(writer, command.Id);
MessagePackBinary.WriteString(writer, command.ServerName);
MessagePackBinary.WriteByte(writer, (byte)command.Action);
MessagePackBinary.WriteString(writer, command.GroupName);
MessagePackBinary.WriteString(writer, command.ConnectionId);
return writer.ToArray();
}
finally
{
MemoryBufferWriter.Return(writer);
}
}
public byte[] WriteAck(int messageId)
{
// Written as a MessagePack 'arr' containing at least these items:
// * An 'int': The Id of the command being acknowledged.
// Any additional items are discarded.
var writer = MemoryBufferWriter.Get();
try
{
MessagePackBinary.WriteArrayHeader(writer, 1);
MessagePackBinary.WriteInt32(writer, messageId);
return writer.ToArray();
}
finally
{
MemoryBufferWriter.Return(writer);
}
}
public RedisInvocation ReadInvocation(ReadOnlyMemory<byte> data)
{
// See WriteInvocation for the format
ValidateArraySize(ref data, 2, "Invocation");
// Read excluded Ids
IReadOnlyList<string> excludedConnectionIds = null;
var idCount = MessagePackUtil.ReadArrayHeader(ref data);
if (idCount > 0)
{
var ids = new string[idCount];
for (var i = 0; i < idCount; i++)
{
ids[i] = MessagePackUtil.ReadString(ref data);
}
excludedConnectionIds = ids;
}
// Read payload
var message = ReadSerializedHubMessage(ref data);
return new RedisInvocation(message, excludedConnectionIds);
}
public RedisGroupCommand ReadGroupCommand(ReadOnlyMemory<byte> data)
{
// See WriteGroupCommand for format.
ValidateArraySize(ref data, 5, "GroupCommand");
var id = MessagePackUtil.ReadInt32(ref data);
var serverName = MessagePackUtil.ReadString(ref data);
var action = (GroupAction)MessagePackUtil.ReadByte(ref data);
var groupName = MessagePackUtil.ReadString(ref data);
var connectionId = MessagePackUtil.ReadString(ref data);
return new RedisGroupCommand(id, serverName, action, groupName, connectionId);
}
public int ReadAck(ReadOnlyMemory<byte> data)
{
// See WriteAck for format
ValidateArraySize(ref data, 1, "Ack");
return MessagePackUtil.ReadInt32(ref data);
}
private void WriteSerializedHubMessage(Stream stream, SerializedHubMessage message)
{
// Written as a MessagePack 'map' where the keys are the name of the protocol (as a MessagePack 'str')
// and the values are the serialized blob (as a MessagePack 'bin').
MessagePackBinary.WriteMapHeader(stream, _protocols.Count);
foreach (var protocol in _protocols)
{
MessagePackBinary.WriteString(stream, protocol.Name);
var serialized = message.GetSerializedMessage(protocol);
var isArray = MemoryMarshal.TryGetArray(serialized, out var array);
Debug.Assert(isArray);
MessagePackBinary.WriteBytes(stream, array.Array, array.Offset, array.Count);
}
}
public static SerializedHubMessage ReadSerializedHubMessage(ref ReadOnlyMemory<byte> data)
{
var count = MessagePackUtil.ReadMapHeader(ref data);
var serializations = new SerializedMessage[count];
for (var i = 0; i < count; i++)
{
var protocol = MessagePackUtil.ReadString(ref data);
var serialized = MessagePackUtil.ReadBytes(ref data);
serializations[i] = new SerializedMessage(protocol, serialized);
}
return new SerializedHubMessage(serializations);
}
private static void ValidateArraySize(ref ReadOnlyMemory<byte> data, int expectedLength, string messageType)
{
var length = MessagePackUtil.ReadArrayHeader(ref data);
if (length < expectedLength)
{
throw new InvalidDataException($"Insufficient items in {messageType} array.");
}
}
}
}

View File

@ -1,63 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
internal class RedisSubscriptionManager
{
private readonly ConcurrentDictionary<string, HubConnectionStore> _subscriptions = new ConcurrentDictionary<string, HubConnectionStore>(StringComparer.Ordinal);
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
public async Task AddSubscriptionAsync(string id, HubConnectionContext connection, Func<string, HubConnectionStore, Task> subscribeMethod)
{
await _lock.WaitAsync();
try
{
var subscription = _subscriptions.GetOrAdd(id, _ => new HubConnectionStore());
subscription.Add(connection);
// Subscribe once
if (subscription.Count == 1)
{
await subscribeMethod(id, subscription);
}
}
finally
{
_lock.Release();
}
}
public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connection, Func<string, Task> unsubscribeMethod)
{
await _lock.WaitAsync();
try
{
if (!_subscriptions.TryGetValue(id, out var subscription))
{
return;
}
subscription.Remove(connection);
if (subscription.Count == 0)
{
_subscriptions.TryRemove(id, out _);
await unsubscribeMethod(id);
}
}
finally
{
_lock.Release();
}
}
}
}

View File

@ -1,23 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Redis for ASP.NET Core SignalR.</Description>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Common\JsonUtils.cs" Link="Internal\JsonUtils.cs" />
<Compile Include="..\Common\MemoryBufferWriter.cs" Link="Internal\MemoryBufferWriter.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options" Version="$(MicrosoftExtensionsOptionsPackageVersion)" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchangeRedisStrongNamePackageVersion)" />
<PackageReference Include="MessagePack" Version="$(MessagePackPackageVersion)" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Core\Microsoft.AspNetCore.SignalR.Core.csproj" />
</ItemGroup>
</Project>

View File

@ -1,69 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Redis;
using StackExchange.Redis;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Extension methods for configuring Redis-based scale-out for a SignalR Server in an <see cref="ISignalRServerBuilder" />.
/// </summary>
public static class RedisDependencyInjectionExtensions
{
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddRedis(this ISignalRServerBuilder signalrBuilder)
{
return AddRedis(signalrBuilder, o => { });
}
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <param name="redisConnectionString">The connection string used to connect to the Redis server.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddRedis(this ISignalRServerBuilder signalrBuilder, string redisConnectionString)
{
return AddRedis(signalrBuilder, o =>
{
o.Configuration = ConfigurationOptions.Parse(redisConnectionString);
});
}
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <param name="configure">A callback to configure the Redis options.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddRedis(this ISignalRServerBuilder signalrBuilder, Action<RedisOptions> configure)
{
signalrBuilder.Services.Configure(configure);
signalrBuilder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(RedisHubLifetimeManager<>));
return signalrBuilder;
}
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <param name="redisConnectionString">The connection string used to connect to the Redis server.</param>
/// <param name="configure">A callback to configure the Redis options.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddRedis(this ISignalRServerBuilder signalrBuilder, string redisConnectionString, Action<RedisOptions> configure)
{
return AddRedis(signalrBuilder, o =>
{
o.Configuration = ConfigurationOptions.Parse(redisConnectionString);
configure(o);
});
}
}
}

View File

@ -1,587 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Redis.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Redis
{
public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposable where THub : Hub
{
private readonly HubConnectionStore _connections = new HubConnectionStore();
private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager();
private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager();
private IConnectionMultiplexer _redisServerConnection;
private ISubscriber _bus;
private readonly ILogger _logger;
private readonly RedisOptions _options;
private readonly RedisChannels _channels;
private readonly string _serverName = GenerateServerName();
private readonly RedisProtocol _protocol;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);
private readonly AckHandler _ackHandler;
private int _internalId;
public RedisHubLifetimeManager(ILogger<RedisHubLifetimeManager<THub>> logger,
IOptions<RedisOptions> options,
IHubProtocolResolver hubProtocolResolver)
{
_logger = logger;
_options = options.Value;
_ackHandler = new AckHandler();
_channels = new RedisChannels(typeof(THub).FullName);
_protocol = new RedisProtocol(hubProtocolResolver.AllProtocols);
RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName);
_ = EnsureRedisServerConnection();
}
public override async Task OnConnectedAsync(HubConnectionContext connection)
{
await EnsureRedisServerConnection();
var feature = new RedisFeature();
connection.Features.Set<IRedisFeature>(feature);
var connectionTask = Task.CompletedTask;
var userTask = Task.CompletedTask;
_connections.Add(connection);
connectionTask = SubscribeToConnection(connection);
if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
userTask = SubscribeToUser(connection);
}
await Task.WhenAll(connectionTask, userTask);
}
public override Task OnDisconnectedAsync(HubConnectionContext connection)
{
_connections.Remove(connection);
var tasks = new List<Task>();
var connectionChannel = _channels.Connection(connection.ConnectionId);
RedisLog.Unsubscribe(_logger, connectionChannel);
tasks.Add(_bus.UnsubscribeAsync(connectionChannel));
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
if (groupNames != null)
{
// Copy the groups to an array here because they get removed from this collection
// in RemoveFromGroupAsync
foreach (var group in groupNames.ToArray())
{
// Use RemoveGroupAsyncCore because the connection is local and we don't want to
// accidentally go to other servers with our remove request.
tasks.Add(RemoveGroupAsyncCore(connection, group));
}
}
if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
tasks.Add(RemoveUserAsync(connection));
}
return Task.WhenAll(tasks);
}
public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.All, message);
}
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.All, message);
}
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
// If the connection is local we can skip sending the message through the bus since we require sticky connections.
// This also saves serializing and deserializing the message!
var connection = _connections[connectionId];
if (connection != null)
{
return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask();
}
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.Connection(connectionId), message);
}
public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.User(userId), message);
}
public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
return AddGroupAsyncCore(connection, groupName);
}
return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add);
}
public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
return RemoveGroupAsyncCore(connection, groupName);
}
return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove);
}
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionIds == null)
{
throw new ArgumentNullException(nameof(connectionIds));
}
var publishTasks = new List<Task>(connectionIds.Count);
var payload = _protocol.WriteInvocation(methodName, args);
foreach (var connectionId in connectionIds)
{
publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload));
}
return Task.WhenAll(publishTasks);
}
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupNames == null)
{
throw new ArgumentNullException(nameof(groupNames));
}
var publishTasks = new List<Task>(groupNames.Count);
var payload = _protocol.WriteInvocation(methodName, args);
foreach (var groupName in groupNames)
{
if (!string.IsNullOrEmpty(groupName))
{
publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
}
}
return Task.WhenAll(publishTasks);
}
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (userIds.Count > 0)
{
var payload = _protocol.WriteInvocation(methodName, args);
var publishTasks = new List<Task>(userIds.Count);
foreach (var userId in userIds)
{
if (!string.IsNullOrEmpty(userId))
{
publishTasks.Add(PublishAsync(_channels.User(userId), payload));
}
}
return Task.WhenAll(publishTasks);
}
return Task.CompletedTask;
}
private async Task PublishAsync(string channel, byte[] payload)
{
await EnsureRedisServerConnection();
RedisLog.PublishToChannel(_logger, channel);
await _bus.PublishAsync(channel, payload);
}
private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
lock (groupNames)
{
// Connection already in group
if (!groupNames.Add(groupName))
{
return Task.CompletedTask;
}
}
var groupChannel = _channels.Group(groupName);
return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync);
}
/// <summary>
/// This takes <see cref="HubConnectionContext"/> because we want to remove the connection from the
/// _connections list in OnDisconnectedAsync and still be able to remove groups with this method.
/// </summary>
private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var groupChannel = _channels.Group(groupName);
await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName =>
{
RedisLog.Unsubscribe(_logger, channelName);
return _bus.UnsubscribeAsync(channelName);
});
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
if (groupNames != null)
{
lock (groupNames)
{
groupNames.Remove(groupName);
}
}
}
private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action)
{
var id = Interlocked.Increment(ref _internalId);
var ack = _ackHandler.CreateAck(id);
// Send Add/Remove Group to other servers and wait for an ack or timeout
var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId));
await PublishAsync(_channels.GroupManagement, message);
await ack;
}
private Task RemoveUserAsync(HubConnectionContext connection)
{
var userChannel = _channels.User(connection.UserIdentifier);
return _users.RemoveSubscriptionAsync(userChannel, connection, channelName =>
{
RedisLog.Unsubscribe(_logger, channelName);
return _bus.UnsubscribeAsync(channelName);
});
}
public void Dispose()
{
_bus?.UnsubscribeAll();
_redisServerConnection?.Dispose();
_ackHandler.Dispose();
}
private Task SubscribeToAll()
{
RedisLog.Subscribing(_logger, _channels.All);
return _bus.SubscribeAsync(_channels.All, async (c, data) =>
{
try
{
RedisLog.ReceivedFromChannel(_logger, _channels.All);
var invocation = _protocol.ReadInvocation((byte[])data);
var tasks = new List<Task>(_connections.Count);
foreach (var connection in _connections)
{
if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId))
{
tasks.Add(connection.WriteAsync(invocation.Message).AsTask());
}
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
}
private Task SubscribeToGroupManagementChannel()
{
return _bus.SubscribeAsync(_channels.GroupManagement, async (c, data) =>
{
try
{
var groupMessage = _protocol.ReadGroupCommand((byte[])data);
var connection = _connections[groupMessage.ConnectionId];
if (connection == null)
{
// user not on this server
return;
}
if (groupMessage.Action == GroupAction.Remove)
{
await RemoveGroupAsyncCore(connection, groupMessage.GroupName);
}
if (groupMessage.Action == GroupAction.Add)
{
await AddGroupAsyncCore(connection, groupMessage.GroupName);
}
// Send an ack to the server that sent the original command.
await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id));
}
catch (Exception ex)
{
RedisLog.InternalMessageFailed(_logger, ex);
}
});
}
private Task SubscribeToAckChannel()
{
// Create server specific channel in order to send an ack to a single server
return _bus.SubscribeAsync(_channels.Ack(_serverName), (c, data) =>
{
var ackId = _protocol.ReadAck((byte[])data);
_ackHandler.TriggerAck(ackId);
});
}
private Task SubscribeToConnection(HubConnectionContext connection)
{
var connectionChannel = _channels.Connection(connection.ConnectionId);
RedisLog.Subscribing(_logger, connectionChannel);
return _bus.SubscribeAsync(connectionChannel, async (c, data) =>
{
var invocation = _protocol.ReadInvocation((byte[])data);
await connection.WriteAsync(invocation.Message);
});
}
private Task SubscribeToUser(HubConnectionContext connection)
{
var userChannel = _channels.User(connection.UserIdentifier);
return _users.AddSubscriptionAsync(userChannel, connection, (channelName, subscriptions) =>
{
RedisLog.Subscribing(_logger, channelName);
return _bus.SubscribeAsync(channelName, async (c, data) =>
{
try
{
var invocation = _protocol.ReadInvocation((byte[])data);
var tasks = new List<Task>();
foreach (var userConnection in subscriptions)
{
tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask());
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
});
}
private Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections)
{
RedisLog.Subscribing(_logger, groupChannel);
return _bus.SubscribeAsync(groupChannel, async (c, data) =>
{
try
{
var invocation = _protocol.ReadInvocation((byte[])data);
var tasks = new List<Task>();
foreach (var groupConnection in groupConnections)
{
if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true)
{
continue;
}
tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask());
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
}
private async Task EnsureRedisServerConnection()
{
if (_redisServerConnection == null)
{
await _connectionLock.WaitAsync();
try
{
if (_redisServerConnection == null)
{
var writer = new LoggerTextWriter(_logger);
_redisServerConnection = await _options.ConnectAsync(writer);
_bus = _redisServerConnection.GetSubscriber();
_redisServerConnection.ConnectionRestored += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionRestored(_logger);
};
_redisServerConnection.ConnectionFailed += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionFailed(_logger, e.Exception);
};
if (_redisServerConnection.IsConnected)
{
RedisLog.Connected(_logger);
}
else
{
RedisLog.NotConnected(_logger);
}
await SubscribeToAll();
await SubscribeToGroupManagementChannel();
await SubscribeToAckChannel();
}
}
finally
{
_connectionLock.Release();
}
}
}
private static string GenerateServerName()
{
// Use the machine name for convenient diagnostics, but add a guid to make it unique.
// Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29
return $"{Environment.MachineName}_{Guid.NewGuid():N}";
}
private class LoggerTextWriter : TextWriter
{
private readonly ILogger _logger;
public LoggerTextWriter(ILogger logger)
{
_logger = logger;
}
public override Encoding Encoding => Encoding.UTF8;
public override void Write(char value)
{
}
public override void WriteLine(string value)
{
RedisLog.ConnectionMultiplexerMessage(_logger, value);
}
}
private interface IRedisFeature
{
HashSet<string> Groups { get; }
}
private class RedisFeature : IRedisFeature
{
public HashSet<string> Groups { get; } = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
}
}
}

View File

@ -1,50 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Redis
{
/// <summary>
/// Options used to configure <see cref="RedisHubLifetimeManager{THub}"/>.
/// </summary>
public class RedisOptions
{
/// <summary>
/// Gets or sets configuration options exposed by <c>StackExchange.Redis</c>.
/// </summary>
public ConfigurationOptions Configuration { get; set; } = new ConfigurationOptions
{
// Enable reconnecting by default
AbortOnConnectFail = false
};
/// <summary>
/// Gets or sets the Redis connection factory.
/// </summary>
public Func<TextWriter, Task<IConnectionMultiplexer>> ConnectionFactory { get; set; }
internal async Task<IConnectionMultiplexer> ConnectAsync(TextWriter log)
{
// Factory is publically settable. Assigning to a local variable before null check for thread safety.
var factory = ConnectionFactory;
if (factory == null)
{
// REVIEW: Should we do this?
if (Configuration.EndPoints.Count == 0)
{
Configuration.EndPoints.Add(IPAddress.Loopback, 0);
Configuration.SetDefaultPorts();
}
return await ConnectionMultiplexer.ConnectAsync(Configuration, log);
}
return await factory(log);
}
}
}

View File

@ -1,532 +0,0 @@
{
"AssemblyIdentity": "Microsoft.AspNetCore.SignalR.Redis, Version=1.0.0.0, Culture=neutral, PublicKeyToken=adb9793829ddae60",
"Types": [
{
"Name": "Microsoft.Extensions.DependencyInjection.RedisDependencyInjectionExtensions",
"Visibility": "Public",
"Kind": "Class",
"Abstract": true,
"Static": true,
"Sealed": true,
"ImplementedInterfaces": [],
"Members": [
{
"Kind": "Method",
"Name": "AddRedis",
"Parameters": [
{
"Name": "signalrBuilder",
"Type": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder"
}
],
"ReturnType": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder",
"Static": true,
"Extension": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "AddRedis",
"Parameters": [
{
"Name": "signalrBuilder",
"Type": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder"
},
{
"Name": "redisConnectionString",
"Type": "System.String"
}
],
"ReturnType": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder",
"Static": true,
"Extension": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "AddRedis",
"Parameters": [
{
"Name": "signalrBuilder",
"Type": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder"
},
{
"Name": "configure",
"Type": "System.Action<Microsoft.AspNetCore.SignalR.Redis.RedisOptions>"
}
],
"ReturnType": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder",
"Static": true,
"Extension": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "AddRedis",
"Parameters": [
{
"Name": "signalrBuilder",
"Type": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder"
},
{
"Name": "redisConnectionString",
"Type": "System.String"
},
{
"Name": "configure",
"Type": "System.Action<Microsoft.AspNetCore.SignalR.Redis.RedisOptions>"
}
],
"ReturnType": "Microsoft.AspNetCore.SignalR.ISignalRServerBuilder",
"Static": true,
"Extension": true,
"Visibility": "Public",
"GenericParameter": []
}
],
"GenericParameters": []
},
{
"Name": "Microsoft.AspNetCore.SignalR.Redis.RedisHubLifetimeManager<T0>",
"Visibility": "Public",
"Kind": "Class",
"BaseType": "Microsoft.AspNetCore.SignalR.HubLifetimeManager<T0>",
"ImplementedInterfaces": [
"System.IDisposable"
],
"Members": [
{
"Kind": "Method",
"Name": "OnConnectedAsync",
"Parameters": [
{
"Name": "connection",
"Type": "Microsoft.AspNetCore.SignalR.HubConnectionContext"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "OnDisconnectedAsync",
"Parameters": [
{
"Name": "connection",
"Type": "Microsoft.AspNetCore.SignalR.HubConnectionContext"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendAllAsync",
"Parameters": [
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendAllExceptAsync",
"Parameters": [
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "excludedConnectionIds",
"Type": "System.Collections.Generic.IReadOnlyList<System.String>"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendConnectionAsync",
"Parameters": [
{
"Name": "connectionId",
"Type": "System.String"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendGroupAsync",
"Parameters": [
{
"Name": "groupName",
"Type": "System.String"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendGroupExceptAsync",
"Parameters": [
{
"Name": "groupName",
"Type": "System.String"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "excludedConnectionIds",
"Type": "System.Collections.Generic.IReadOnlyList<System.String>"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendUserAsync",
"Parameters": [
{
"Name": "userId",
"Type": "System.String"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "AddToGroupAsync",
"Parameters": [
{
"Name": "connectionId",
"Type": "System.String"
},
{
"Name": "groupName",
"Type": "System.String"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "RemoveFromGroupAsync",
"Parameters": [
{
"Name": "connectionId",
"Type": "System.String"
},
{
"Name": "groupName",
"Type": "System.String"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendConnectionsAsync",
"Parameters": [
{
"Name": "connectionIds",
"Type": "System.Collections.Generic.IReadOnlyList<System.String>"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendGroupsAsync",
"Parameters": [
{
"Name": "groupNames",
"Type": "System.Collections.Generic.IReadOnlyList<System.String>"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "SendUsersAsync",
"Parameters": [
{
"Name": "userIds",
"Type": "System.Collections.Generic.IReadOnlyList<System.String>"
},
{
"Name": "methodName",
"Type": "System.String"
},
{
"Name": "args",
"Type": "System.Object[]"
},
{
"Name": "cancellationToken",
"Type": "System.Threading.CancellationToken",
"DefaultValue": "default(System.Threading.CancellationToken)"
}
],
"ReturnType": "System.Threading.Tasks.Task",
"Virtual": true,
"Override": true,
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "Dispose",
"Parameters": [],
"ReturnType": "System.Void",
"Sealed": true,
"Virtual": true,
"ImplementedInterface": "System.IDisposable",
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Constructor",
"Name": ".ctor",
"Parameters": [
{
"Name": "logger",
"Type": "Microsoft.Extensions.Logging.ILogger<Microsoft.AspNetCore.SignalR.Redis.RedisHubLifetimeManager<T0>>"
},
{
"Name": "options",
"Type": "Microsoft.Extensions.Options.IOptions<Microsoft.AspNetCore.SignalR.Redis.RedisOptions>"
},
{
"Name": "hubProtocolResolver",
"Type": "Microsoft.AspNetCore.SignalR.IHubProtocolResolver"
}
],
"Visibility": "Public",
"GenericParameter": []
}
],
"GenericParameters": [
{
"ParameterName": "THub",
"ParameterPosition": 0,
"BaseTypeOrInterfaces": [
"Microsoft.AspNetCore.SignalR.Hub"
]
}
]
},
{
"Name": "Microsoft.AspNetCore.SignalR.Redis.RedisOptions",
"Visibility": "Public",
"Kind": "Class",
"ImplementedInterfaces": [],
"Members": [
{
"Kind": "Method",
"Name": "get_Configuration",
"Parameters": [],
"ReturnType": "StackExchange.Redis.ConfigurationOptions",
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "set_Configuration",
"Parameters": [
{
"Name": "value",
"Type": "StackExchange.Redis.ConfigurationOptions"
}
],
"ReturnType": "System.Void",
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "get_ConnectionFactory",
"Parameters": [],
"ReturnType": "System.Func<System.IO.TextWriter, System.Threading.Tasks.Task<StackExchange.Redis.IConnectionMultiplexer>>",
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Method",
"Name": "set_ConnectionFactory",
"Parameters": [
{
"Name": "value",
"Type": "System.Func<System.IO.TextWriter, System.Threading.Tasks.Task<StackExchange.Redis.IConnectionMultiplexer>>"
}
],
"ReturnType": "System.Void",
"Visibility": "Public",
"GenericParameter": []
},
{
"Kind": "Constructor",
"Name": ".ctor",
"Parameters": [],
"Visibility": "Public",
"GenericParameter": []
}
],
"GenericParameters": []
}
]
}

View File

@ -1,192 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class Docker
{
private static readonly string _exeSuffix = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty;
private static readonly string _dockerContainerName = "redisTestContainer-1x";
private static readonly string _dockerMonitorContainerName = _dockerContainerName + "Monitor-1x";
private static readonly Lazy<Docker> _instance = new Lazy<Docker>(Create);
public static Docker Default => _instance.Value;
private readonly string _path;
public Docker(string path)
{
_path = path;
}
private static Docker Create()
{
var location = GetDockerLocation();
if (location == null)
{
return null;
}
var docker = new Docker(location);
docker.RunCommand("info --format '{{.OSType}}'", "docker info", out var output);
if (!string.Equals(output.Trim('\'', '"', '\r', '\n', ' '), "linux"))
{
Console.WriteLine($"'docker info' output: {output}");
return null;
}
return docker;
}
private static string GetDockerLocation()
{
// OSX + Docker + Redis don't play well together for some reason. We already have these tests covered on Linux and Windows
// So we are happy ignoring them on OSX
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
return null;
}
foreach (var dir in Environment.GetEnvironmentVariable("PATH").Split(Path.PathSeparator))
{
var candidate = Path.Combine(dir, "docker" + _exeSuffix);
if (File.Exists(candidate))
{
return candidate;
}
}
return null;
}
public void Start(ILogger logger)
{
logger.LogInformation("Starting docker container");
// stop container if there is one, could be from a previous test run, ignore failures
RunProcessAndWait(_path, $"stop {_dockerMonitorContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var _);
RunProcessAndWait(_path, $"stop {_dockerContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var output);
// create and run docker container, remove automatically when stopped, map 6379 from the container to 6379 localhost
// use static name 'redisTestContainer' so if the container doesn't get removed we don't keep adding more
// use redis base docker image
// 20 second timeout to allow redis image to be downloaded, should be a rare occurrence, only happening when a new version is released
RunProcessAndThrowIfFailed(_path, $"run --rm -p 6380:6379 --name {_dockerContainerName} -d redis", "redis", logger, TimeSpan.FromSeconds(20));
// inspect the redis docker image and extract the IPAddress. Necessary when running tests from inside a docker container, spinning up a new docker container for redis
// outside the current container requires linking the networks (difficult to automate) or using the IP:Port combo
RunProcessAndWait(_path, "inspect --format=\"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\" " + _dockerContainerName, "docker ipaddress", logger, TimeSpan.FromSeconds(5), out output);
output = output.Trim().Replace(Environment.NewLine, "");
// variable used by Startup.cs
Environment.SetEnvironmentVariable("REDIS_CONNECTION-PREV", $"{output}:6379");
var (monitorProcess, monitorOutput) = RunProcess(_path, $"run -i --name {_dockerMonitorContainerName} --link {_dockerContainerName}:redis --rm redis redis-cli -h redis -p 6379", "redis monitor", logger);
monitorProcess.StandardInput.WriteLine("MONITOR");
monitorProcess.StandardInput.Flush();
}
public void Stop(ILogger logger)
{
// Get logs from Redis container before stopping the container
RunProcessAndThrowIfFailed(_path, $"logs {_dockerContainerName}", "docker logs", logger, TimeSpan.FromSeconds(5));
logger.LogInformation("Stopping docker container");
RunProcessAndWait(_path, $"stop {_dockerMonitorContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var _);
RunProcessAndWait(_path, $"stop {_dockerContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var _);
}
public int RunCommand(string commandAndArguments, string prefix, out string output) =>
RunCommand(commandAndArguments, prefix, NullLogger.Instance, out output);
public int RunCommand(string commandAndArguments, string prefix, ILogger logger, out string output)
{
return RunProcessAndWait(_path, commandAndArguments, prefix, logger, TimeSpan.FromSeconds(5), out output);
}
private static void RunProcessAndThrowIfFailed(string fileName, string arguments, string prefix, ILogger logger, TimeSpan timeout)
{
var exitCode = RunProcessAndWait(fileName, arguments, prefix, logger, timeout, out var output);
if (exitCode != 0)
{
throw new Exception($"Command '{fileName} {arguments}' failed with exit code '{exitCode}'. Output:{Environment.NewLine}{output}");
}
}
private static int RunProcessAndWait(string fileName, string arguments, string prefix, ILogger logger, TimeSpan timeout, out string output)
{
var (process, lines) = RunProcess(fileName, arguments, prefix, logger);
if (!process.WaitForExit((int)timeout.TotalMilliseconds))
{
process.Close();
logger.LogError("Closing process '{processName}' because it is running longer than the configured timeout.", fileName);
}
// Need to WaitForExit without a timeout to guarantee the output stream has written everything
process.WaitForExit();
output = string.Join(Environment.NewLine, lines);
return process.ExitCode;
}
private static (Process, ConcurrentQueue<string>) RunProcess(string fileName, string arguments, string prefix, ILogger logger)
{
var process = new Process
{
StartInfo = new ProcessStartInfo
{
FileName = fileName,
Arguments = arguments,
UseShellExecute = false,
RedirectStandardError = true,
RedirectStandardOutput = true,
RedirectStandardInput = true
},
EnableRaisingEvents = true
};
var exitCode = 0;
var lines = new ConcurrentQueue<string>();
process.Exited += (_, __) => exitCode = process.ExitCode;
process.OutputDataReceived += (_, a) =>
{
LogIfNotNull(logger.LogInformation, $"'{prefix}' stdout: {{0}}", a.Data);
lines.Enqueue(a.Data);
};
process.ErrorDataReceived += (_, a) =>
{
LogIfNotNull(logger.LogError, $"'{prefix}' stderr: {{0}}", a.Data);
lines.Enqueue(a.Data);
};
process.Start();
process.BeginErrorReadLine();
process.BeginOutputReadLine();
return (process, lines);
}
private static void LogIfNotNull(Action<string, object[]> logger, string message, string data)
{
if (!string.IsNullOrEmpty(data))
{
logger(message, new[] { data });
}
}
}
}

View File

@ -1,31 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class EchoHub : Hub
{
public string Echo(string message)
{
return message;
}
public Task EchoGroup(string groupName, string message)
{
return Clients.Group(groupName).SendAsync("Echo", message);
}
public Task EchoUser(string userName, string message)
{
return Clients.User(userName).SendAsync("Echo", message);
}
public Task AddSelfToGroup(string groupName)
{
return Groups.AddToGroupAsync(Context.ConnectionId, groupName);
}
}
}

View File

@ -1,27 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(StandardTestTfms)</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Specification.Tests\Microsoft.AspNetCore.SignalR.Specification.Tests.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftExtensionsDependencyInjectionPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="$(MicrosoftExtensionsLoggingPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Testing" Version="$(MicrosoftExtensionsLoggingTestingPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -1,41 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class RedisDependencyInjectionExtensionsTests
{
// No need to go too deep with these tests, or we're just testing StackExchange.Redis again :). It's the one doing the parsing.
[Theory]
[InlineData("testredis.example.com", "testredis.example.com", 0, null, false)]
[InlineData("testredis.example.com:6380,ssl=True", "testredis.example.com", 6380, null, true)]
[InlineData("testredis.example.com:6380,password=hunter2,ssl=True", "testredis.example.com", 6380, "hunter2", true)]
public void AddRedisWithConnectionStringProperlyParsesOptions(string connectionString, string host, int port, string password, bool useSsl)
{
var services = new ServiceCollection();
services.AddSignalR().AddRedis(connectionString);
var provider = services.BuildServiceProvider();
var options = provider.GetService<IOptions<RedisOptions>>();
Assert.NotNull(options.Value);
Assert.NotNull(options.Value.Configuration);
Assert.Equal(password, options.Value.Configuration.Password);
Assert.Collection(options.Value.Configuration.EndPoints,
endpoint =>
{
var dnsEndpoint = Assert.IsType<DnsEndPoint>(endpoint);
Assert.Equal(host, dnsEndpoint.Host);
Assert.Equal(port, dnsEndpoint.Port);
});
Assert.Equal(useSsl, options.Value.Configuration.Ssl);
}
}
}

View File

@ -1,198 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
// Disable running server tests in parallel so server logs can accurately be captured per test
[CollectionDefinition(Name, DisableParallelization = true)]
public class RedisEndToEndTestsCollection : ICollectionFixture<RedisServerFixture<Startup>>
{
public const string Name = nameof(RedisEndToEndTestsCollection);
}
[Collection(RedisEndToEndTestsCollection.Name)]
public class RedisEndToEndTests : VerifiableLoggedTest
{
private readonly RedisServerFixture<Startup> _serverFixture;
public RedisEndToEndTests(RedisServerFixture<Startup> serverFixture, ITestOutputHelper output) : base(output)
{
if (serverFixture == null)
{
throw new ArgumentNullException(nameof(serverFixture));
}
_serverFixture = serverFixture;
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task HubConnectionCanSendAndReceiveMessages(HttpTransportType transportType, string protocolName)
{
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(HubConnectionCanSendAndReceiveMessages)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory);
await connection.StartAsync().OrTimeout();
var str = await connection.InvokeAsync<string>("Echo", "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", str);
await connection.DisposeAsync().OrTimeout();
}
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task HubConnectionCanSendAndReceiveGroupMessages(HttpTransportType transportType, string protocolName)
{
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(HubConnectionCanSendAndReceiveGroupMessages)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory);
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, loggerFactory);
var tcs = new TaskCompletionSource<string>();
connection.On<string>("Echo", message => tcs.TrySetResult(message));
var tcs2 = new TaskCompletionSource<string>();
secondConnection.On<string>("Echo", message => tcs2.TrySetResult(message));
var groupName = $"TestGroup_{transportType}_{protocolName}_{Guid.NewGuid()}";
await secondConnection.StartAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
await connection.InvokeAsync("AddSelfToGroup", groupName).OrTimeout();
await secondConnection.InvokeAsync("AddSelfToGroup", groupName).OrTimeout();
await connection.InvokeAsync("EchoGroup", groupName, "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", await tcs.Task.OrTimeout());
Assert.Equal("Hello, World!", await tcs2.Task.OrTimeout());
await connection.DisposeAsync().OrTimeout();
}
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task CanSendAndReceiveUserMessagesFromMultipleConnectionsWithSameUser(HttpTransportType transportType, string protocolName)
{
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(CanSendAndReceiveUserMessagesFromMultipleConnectionsWithSameUser)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var tcs = new TaskCompletionSource<string>();
connection.On<string>("Echo", message => tcs.TrySetResult(message));
var tcs2 = new TaskCompletionSource<string>();
secondConnection.On<string>("Echo", message => tcs2.TrySetResult(message));
await secondConnection.StartAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
await connection.InvokeAsync("EchoUser", "userA", "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", await tcs.Task.OrTimeout());
Assert.Equal("Hello, World!", await tcs2.Task.OrTimeout());
await connection.DisposeAsync().OrTimeout();
await secondConnection.DisposeAsync().OrTimeout();
}
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task CanSendAndReceiveUserMessagesWhenOneConnectionWithUserDisconnects(HttpTransportType transportType, string protocolName)
{
// Regression test:
// When multiple connections from the same user were connected and one left, it used to unsubscribe from the user channel
// Now we keep track of users connections and only unsubscribe when no users are listening
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(CanSendAndReceiveUserMessagesWhenOneConnectionWithUserDisconnects)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var firstConnection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var tcs = new TaskCompletionSource<string>();
firstConnection.On<string>("Echo", message => tcs.TrySetResult(message));
await secondConnection.StartAsync().OrTimeout();
await firstConnection.StartAsync().OrTimeout();
await secondConnection.DisposeAsync().OrTimeout();
await firstConnection.InvokeAsync("EchoUser", "userA", "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", await tcs.Task.OrTimeout());
await firstConnection.DisposeAsync().OrTimeout();
}
}
private static HubConnection CreateConnection(string url, HttpTransportType transportType, IHubProtocol protocol, ILoggerFactory loggerFactory, string userName = null)
{
var hubConnectionBuilder = new HubConnectionBuilder()
.WithLoggerFactory(loggerFactory)
.WithUrl(url, transportType, httpConnectionOptions =>
{
if (!string.IsNullOrEmpty(userName))
{
httpConnectionOptions.Headers["UserName"] = userName;
}
});
hubConnectionBuilder.Services.AddSingleton(protocol);
return hubConnectionBuilder.Build();
}
private static IEnumerable<HttpTransportType> TransportTypes()
{
if (TestHelpers.IsWebSocketsSupported())
{
yield return HttpTransportType.WebSockets;
}
yield return HttpTransportType.ServerSentEvents;
yield return HttpTransportType.LongPolling;
}
public static IEnumerable<object[]> TransportTypesAndProtocolTypes
{
get
{
foreach (var transport in TransportTypes())
{
yield return new object[] { transport, "json" };
if (transport != HttpTransportType.ServerSentEvents)
{
yield return new object[] { transport, "messagepack" };
}
}
}
}
}
}

View File

@ -1,102 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.AspNetCore.SignalR.Specification.Tests;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class RedisHubLifetimeManagerTests : ScaleoutHubLifetimeManagerTests<TestRedisServer>
{
private TestRedisServer _server;
public override HubLifetimeManager<MyHub> CreateNewHubLifetimeManager(TestRedisServer backplane)
{
return CreateLifetimeManager(backplane);
}
public override TestRedisServer CreateBackplane()
{
return new TestRedisServer();
}
public override HubLifetimeManager<MyHub> CreateNewHubLifetimeManager()
{
_server = new TestRedisServer();
return CreateLifetimeManager(_server);
}
public class TestObject
{
public string TestProperty { get; set; }
}
private RedisHubLifetimeManager<MyHub> CreateLifetimeManager(TestRedisServer server, MessagePackHubProtocolOptions messagePackOptions = null, JsonHubProtocolOptions jsonOptions = null)
{
var options = new RedisOptions() { ConnectionFactory = async (t) => await Task.FromResult(new TestConnectionMultiplexer(server)) };
messagePackOptions = messagePackOptions ?? new MessagePackHubProtocolOptions();
jsonOptions = jsonOptions ?? new JsonHubProtocolOptions();
return new RedisHubLifetimeManager<MyHub>(
NullLogger<RedisHubLifetimeManager<MyHub>>.Instance,
Options.Create(options),
new DefaultHubProtocolResolver(new IHubProtocol[]
{
new JsonHubProtocol(Options.Create(jsonOptions)),
new MessagePackHubProtocol(Options.Create(messagePackOptions)),
}, NullLogger<DefaultHubProtocolResolver>.Instance));
}
[Fact]
public async Task CamelCasedJsonIsPreservedAcrossRedisBoundary()
{
var server = new TestRedisServer();
var messagePackOptions = new MessagePackHubProtocolOptions();
var jsonOptions = new JsonHubProtocolOptions();
jsonOptions.PayloadSerializerSettings.ContractResolver = new CamelCasePropertyNamesContractResolver();
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
// The sending manager has serializer settings
var manager1 = CreateLifetimeManager(server, messagePackOptions, jsonOptions);
// The receiving one doesn't matter because of how we serialize!
var manager2 = CreateLifetimeManager(server);
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
var connection2 = HubConnectionContextUtils.Create(client2.Connection);
await manager1.OnConnectedAsync(connection1).OrTimeout();
await manager2.OnConnectedAsync(connection2).OrTimeout();
await manager1.SendAllAsync("Hello", new object[] { new TestObject { TestProperty = "Foo" } });
var message = Assert.IsType<InvocationMessage>(await client2.ReadAsync().OrTimeout());
Assert.Equal("Hello", message.Target);
Assert.Collection(
message.Arguments,
arg0 =>
{
var dict = Assert.IsType<JObject>(arg0);
Assert.Collection(dict.Properties(),
prop =>
{
Assert.Equal("testProperty", prop.Name);
Assert.Equal("Foo", prop.Value.Value<string>());
});
});
}
}
}
}

View File

@ -1,202 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Redis.Internal;
using Microsoft.AspNetCore.SignalR.Tests;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class RedisProtocolTests
{
private static Dictionary<string, ProtocolTestData<int>> _ackTestData = new[]
{
CreateTestData("Zero", 0, 0x91, 0x00),
CreateTestData("Fixnum", 42, 0x91, 0x2A),
CreateTestData("Uint8", 180, 0x91, 0xCC, 0xB4),
CreateTestData("Uint16", 384, 0x91, 0xCD, 0x01, 0x80),
CreateTestData("Uint32", 70_000, 0x91, 0xCE, 0x00, 0x01, 0x11, 0x70),
}.ToDictionary(t => t.Name);
public static IEnumerable<object[]> AckTestData = _ackTestData.Keys.Select(k => new object[] { k });
[Theory]
[MemberData(nameof(AckTestData))]
public void ParseAck(string testName)
{
var testData = _ackTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var decoded = protocol.ReadAck(testData.Encoded);
Assert.Equal(testData.Decoded, decoded);
}
[Theory]
[MemberData(nameof(AckTestData))]
public void WriteAck(string testName)
{
var testData = _ackTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var encoded = protocol.WriteAck(testData.Decoded);
Assert.Equal(testData.Encoded, encoded);
}
private static Dictionary<string, ProtocolTestData<RedisGroupCommand>> _groupCommandTestData = new[]
{
CreateTestData("GroupAdd", new RedisGroupCommand(42, "S", GroupAction.Add, "G", "C" ), 0x95, 0x2A, 0xA1, (byte)'S', 0x01, 0xA1, (byte)'G', 0xA1, (byte)'C'),
CreateTestData("GroupRemove", new RedisGroupCommand(42, "S", GroupAction.Remove, "G", "C" ), 0x95, 0x2A, 0xA1, (byte)'S', 0x02, 0xA1, (byte)'G', 0xA1, (byte)'C'),
}.ToDictionary(t => t.Name);
public static IEnumerable<object[]> GroupCommandTestData = _groupCommandTestData.Keys.Select(k => new object[] { k });
[Theory]
[MemberData(nameof(GroupCommandTestData))]
public void ParseGroupCommand(string testName)
{
var testData = _groupCommandTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var decoded = protocol.ReadGroupCommand(testData.Encoded);
Assert.Equal(testData.Decoded.Id, decoded.Id);
Assert.Equal(testData.Decoded.ServerName, decoded.ServerName);
Assert.Equal(testData.Decoded.Action, decoded.Action);
Assert.Equal(testData.Decoded.GroupName, decoded.GroupName);
Assert.Equal(testData.Decoded.ConnectionId, decoded.ConnectionId);
}
[Theory]
[MemberData(nameof(GroupCommandTestData))]
public void WriteGroupCommand(string testName)
{
var testData = _groupCommandTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var encoded = protocol.WriteGroupCommand(testData.Decoded);
Assert.Equal(testData.Encoded, encoded);
}
// The actual invocation message doesn't matter
private static InvocationMessage _testMessage = new InvocationMessage("target", Array.Empty<object>());
// We use a func so we are guaranteed to get a new SerializedHubMessage for each test
private static Dictionary<string, ProtocolTestData<Func<RedisInvocation>>> _invocationTestData = new[]
{
CreateTestData<Func<RedisInvocation>>(
"NoExcludedIds",
() => new RedisInvocation(new SerializedHubMessage(_testMessage), null),
0x92,
0x90,
0x82,
0xA2, (byte)'p', (byte)'1',
0xC4, 0x01, 0x2A,
0xA2, (byte)'p', (byte)'2',
0xC4, 0x01, 0x2A),
CreateTestData<Func<RedisInvocation>>(
"OneExcludedId",
() => new RedisInvocation(new SerializedHubMessage(_testMessage), new [] { "a" }),
0x92,
0x91,
0xA1, (byte)'a',
0x82,
0xA2, (byte)'p', (byte)'1',
0xC4, 0x01, 0x2A,
0xA2, (byte)'p', (byte)'2',
0xC4, 0x01, 0x2A),
CreateTestData<Func<RedisInvocation>>(
"ManyExcludedIds",
() => new RedisInvocation(new SerializedHubMessage(_testMessage), new [] { "a", "b", "c", "d", "e", "f" }),
0x92,
0x96,
0xA1, (byte)'a',
0xA1, (byte)'b',
0xA1, (byte)'c',
0xA1, (byte)'d',
0xA1, (byte)'e',
0xA1, (byte)'f',
0x82,
0xA2, (byte)'p', (byte)'1',
0xC4, 0x01, 0x2A,
0xA2, (byte)'p', (byte)'2',
0xC4, 0x01, 0x2A),
}.ToDictionary(t => t.Name);
public static IEnumerable<object[]> InvocationTestData = _invocationTestData.Keys.Select(k => new object[] { k });
[Theory]
[MemberData(nameof(InvocationTestData))]
public void ParseInvocation(string testName)
{
var testData = _invocationTestData[testName];
var hubProtocols = new[] { new DummyHubProtocol("p1"), new DummyHubProtocol("p2") };
var protocol = new RedisProtocol(hubProtocols);
var expected = testData.Decoded();
var decoded = protocol.ReadInvocation(testData.Encoded);
Assert.Equal(expected.ExcludedConnectionIds, decoded.ExcludedConnectionIds);
// Verify the deserialized object has the necessary serialized forms
foreach (var hubProtocol in hubProtocols)
{
Assert.Equal(
expected.Message.GetSerializedMessage(hubProtocol).ToArray(),
decoded.Message.GetSerializedMessage(hubProtocol).ToArray());
var writtenMessages = hubProtocol.GetWrittenMessages();
Assert.Collection(writtenMessages,
actualMessage =>
{
var invocation = Assert.IsType<InvocationMessage>(actualMessage);
Assert.Same(_testMessage.Target, invocation.Target);
Assert.Same(_testMessage.Arguments, invocation.Arguments);
});
}
}
[Theory]
[MemberData(nameof(InvocationTestData))]
public void WriteInvocation(string testName)
{
var testData = _invocationTestData[testName];
var protocol = new RedisProtocol(new[] { new DummyHubProtocol("p1"), new DummyHubProtocol("p2") });
// Actual invocation doesn't matter because we're using a dummy hub protocol.
// But the dummy protocol will check that we gave it the test message to make sure everything flows through properly.
var expected = testData.Decoded();
var encoded = protocol.WriteInvocation(_testMessage.Target, _testMessage.Arguments, expected.ExcludedConnectionIds);
Assert.Equal(testData.Encoded, encoded);
}
// Create ProtocolTestData<T> using the Power of Type Inference(TM).
private static ProtocolTestData<T> CreateTestData<T>(string name, T decoded, params byte[] encoded)
=> new ProtocolTestData<T>(name, decoded, encoded);
public class ProtocolTestData<T>
{
public string Name { get; }
public T Decoded { get; }
public byte[] Encoded { get; }
public ProtocolTestData(string name, T decoded, byte[] encoded)
{
Name = name;
Decoded = decoded;
Encoded = encoded;
}
}
}
}

View File

@ -1,64 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class RedisServerFixture<TStartup> : IDisposable
where TStartup : class
{
public ServerFixture<TStartup> FirstServer { get; private set; }
public ServerFixture<TStartup> SecondServer { get; private set; }
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly IDisposable _logToken;
public RedisServerFixture()
{
// Docker is not available on the machine, tests using this fixture
// should be using SkipIfDockerNotPresentAttribute and will be skipped.
if (Docker.Default == null)
{
return;
}
var testLog = AssemblyTestLog.ForAssembly(typeof(RedisServerFixture<TStartup>).Assembly);
_logToken = testLog.StartTestLog(null, $"{nameof(RedisServerFixture<TStartup>)}_{typeof(TStartup).Name}", out _loggerFactory, LogLevel.Trace, "RedisServerFixture");
_logger = _loggerFactory.CreateLogger<RedisServerFixture<TStartup>>();
Docker.Default.Start(_logger);
FirstServer = StartServer();
SecondServer = StartServer();
}
private ServerFixture<TStartup> StartServer()
{
try
{
return new ServerFixture<TStartup>(_loggerFactory);
}
catch (Exception ex)
{
_logger.LogError(ex, "Server failed to start.");
throw;
}
}
public void Dispose()
{
if (Docker.Default != null)
{
FirstServer.Dispose();
SecondServer.Dispose();
Docker.Default.Stop(_logger);
_logToken.Dispose();
}
}
}
}

View File

@ -1,39 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.Testing.xunit;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class SkipIfDockerNotPresentAttribute : Attribute, ITestCondition
{
public bool IsMet => CheckDocker();
public string SkipReason { get; private set; } = "Docker is not available";
private bool CheckDocker()
{
if(Docker.Default != null)
{
// Docker is present, but is it working?
if (Docker.Default.RunCommand("ps", "docker ps", out var output) != 0)
{
SkipReason = $"Failed to invoke test command 'docker ps'. Output: {output}";
}
else
{
// We have a docker
return true;
}
}
else
{
SkipReason = "Docker is not installed on the host machine.";
}
// If we get here, we don't have a docker
return false;
}
}
}

View File

@ -1,49 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Primitives;
namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR(options =>
{
options.EnableDetailedErrors = true;
})
.AddMessagePackProtocol()
.AddRedis(options =>
{
options.Configuration.EndPoints.Add(Environment.GetEnvironmentVariable("REDIS_CONNECTION-PREV"));
});
services.AddSingleton<IUserIdProvider, UserNameIdProvider>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseSignalR(options => options.MapHub<EchoHub>("/echo"));
}
private class UserNameIdProvider : IUserIdProvider
{
public string GetUserId(HubConnectionContext connection)
{
// This is an AWFUL way to authenticate users! We're just using it for test purposes.
var userNameHeader = connection.GetHttpContext().Request.Headers["UserName"];
if (!StringValues.IsNullOrEmpty(userNameHeader))
{
return userNameHeader;
}
return null;
}
}
}
}

View File

@ -1,350 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.Tests
{
public class TestConnectionMultiplexer : IConnectionMultiplexer
{
public string ClientName => throw new NotImplementedException();
public string Configuration => throw new NotImplementedException();
public int TimeoutMilliseconds => throw new NotImplementedException();
public long OperationCount => throw new NotImplementedException();
public bool PreserveAsyncOrder { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public bool IsConnected => true;
public bool IncludeDetailInExceptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public int StormLogThreshold { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public event EventHandler<RedisErrorEventArgs> ErrorMessage
{
add { }
remove { }
}
public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed
{
add { }
remove { }
}
public event EventHandler<InternalErrorEventArgs> InternalError
{
add { }
remove { }
}
public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored
{
add { }
remove { }
}
public event EventHandler<EndPointEventArgs> ConfigurationChanged
{
add { }
remove { }
}
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast
{
add { }
remove { }
}
public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
{
add { }
remove { }
}
private readonly ISubscriber _subscriber;
public TestConnectionMultiplexer(TestRedisServer server)
{
_subscriber = new TestSubscriber(server);
}
public void BeginProfiling(object forContext)
{
throw new NotImplementedException();
}
public void Close(bool allowCommandsToComplete = true)
{
throw new NotImplementedException();
}
public Task CloseAsync(bool allowCommandsToComplete = true)
{
throw new NotImplementedException();
}
public bool Configure(TextWriter log = null)
{
throw new NotImplementedException();
}
public Task<bool> ConfigureAsync(TextWriter log = null)
{
throw new NotImplementedException();
}
public void Dispose()
{
throw new NotImplementedException();
}
public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
{
throw new NotImplementedException();
}
public ServerCounters GetCounters()
{
throw new NotImplementedException();
}
public IDatabase GetDatabase(int db = -1, object asyncState = null)
{
throw new NotImplementedException();
}
public EndPoint[] GetEndPoints(bool configuredOnly = false)
{
throw new NotImplementedException();
}
public IServer GetServer(string host, int port, object asyncState = null)
{
throw new NotImplementedException();
}
public IServer GetServer(string hostAndPort, object asyncState = null)
{
throw new NotImplementedException();
}
public IServer GetServer(IPAddress host, int port)
{
throw new NotImplementedException();
}
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
throw new NotImplementedException();
}
public string GetStatus()
{
throw new NotImplementedException();
}
public void GetStatus(TextWriter log)
{
throw new NotImplementedException();
}
public string GetStormLog()
{
throw new NotImplementedException();
}
public ISubscriber GetSubscriber(object asyncState = null)
{
return _subscriber;
}
public int HashSlot(RedisKey key)
{
throw new NotImplementedException();
}
public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public void RegisterProfiler(IProfiler profiler)
{
throw new NotImplementedException();
}
public void ResetStormLog()
{
throw new NotImplementedException();
}
public void Wait(Task task)
{
throw new NotImplementedException();
}
public T Wait<T>(Task<T> task)
{
throw new NotImplementedException();
}
public void WaitAll(params Task[] tasks)
{
throw new NotImplementedException();
}
}
public class TestRedisServer
{
private readonly ConcurrentDictionary<RedisChannel, List<Action<RedisChannel, RedisValue>>> _subscriptions =
new ConcurrentDictionary<RedisChannel, List<Action<RedisChannel, RedisValue>>>();
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (_subscriptions.TryGetValue(channel, out var handlers))
{
foreach (var handler in handlers)
{
handler(channel, message);
}
}
return handlers != null ? handlers.Count : 0;
}
public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
_subscriptions.AddOrUpdate(channel, _ => new List<Action<RedisChannel, RedisValue>> { handler }, (_, list) =>
{
list.Add(handler);
return list;
});
}
public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
if (_subscriptions.TryGetValue(channel, out var list))
{
list.Remove(handler);
}
}
}
public class TestSubscriber : ISubscriber
{
private readonly TestRedisServer _server;
public ConnectionMultiplexer Multiplexer => throw new NotImplementedException();
public TestSubscriber(TestRedisServer server)
{
_server = server;
}
public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public bool IsConnected(RedisChannel channel = default)
{
throw new NotImplementedException();
}
public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
return _server.Publish(channel, message, flags);
}
public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
await Task.Yield();
return Publish(channel, message, flags);
}
public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
_server.Subscribe(channel, handler, flags);
}
public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
Subscribe(channel, handler, flags);
return Task.CompletedTask;
}
public EndPoint SubscribedEndpoint(RedisChannel channel)
{
throw new NotImplementedException();
}
public bool TryWait(Task task)
{
throw new NotImplementedException();
}
public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
_server.Unsubscribe(channel, handler, flags);
}
public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
Unsubscribe(channel, handler, flags);
return Task.CompletedTask;
}
public void Wait(Task task)
{
throw new NotImplementedException();
}
public T Wait<T>(Task<T> task)
{
throw new NotImplementedException();
}
public void WaitAll(params Task[] tasks)
{
throw new NotImplementedException();
}
}
}