Merge branch 'release/2.2'

This commit is contained in:
BrennanConroy 2018-10-11 10:25:47 -07:00
commit af62105cc6
67 changed files with 3307 additions and 331 deletions

View File

@ -89,7 +89,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Crankier", "benchmarkapps\C
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmarkapps", "benchmarkapps", "{43F352F3-4E2B-4ED7-901B-36E6671251F5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Specification.Tests", "src\Microsoft.AspNetCore.SignalR.Specification.Tests\Microsoft.AspNetCore.SignalR.Specification.Tests.csproj", "{2B03333F-3ACD-474C-862B-FA97D3BA03B5}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Specification.Tests", "src\Microsoft.AspNetCore.SignalR.Specification.Tests\Microsoft.AspNetCore.SignalR.Specification.Tests.csproj", "{2B03333F-3ACD-474C-862B-FA97D3BA03B5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.StackExchangeRedis", "src\Microsoft.AspNetCore.SignalR.StackExchangeRedis\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj", "{D1334F29-5C19-4C7B-B62D-0A2F23AFB31C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests", "test\Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests\Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests.csproj", "{A5006087-81B0-4C62-B847-50ED5C37069D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -213,6 +217,14 @@ Global
{2B03333F-3ACD-474C-862B-FA97D3BA03B5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B03333F-3ACD-474C-862B-FA97D3BA03B5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B03333F-3ACD-474C-862B-FA97D3BA03B5}.Release|Any CPU.Build.0 = Release|Any CPU
{D1334F29-5C19-4C7B-B62D-0A2F23AFB31C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D1334F29-5C19-4C7B-B62D-0A2F23AFB31C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D1334F29-5C19-4C7B-B62D-0A2F23AFB31C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D1334F29-5C19-4C7B-B62D-0A2F23AFB31C}.Release|Any CPU.Build.0 = Release|Any CPU
{A5006087-81B0-4C62-B847-50ED5C37069D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A5006087-81B0-4C62-B847-50ED5C37069D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A5006087-81B0-4C62-B847-50ED5C37069D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A5006087-81B0-4C62-B847-50ED5C37069D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -247,6 +259,8 @@ Global
{8C75AC94-C980-4FE1-9F79-6CED3C8665CE} = {43F352F3-4E2B-4ED7-901B-36E6671251F5}
{8D3E3E7D-452B-44F4-86CA-111003EA11ED} = {43F352F3-4E2B-4ED7-901B-36E6671251F5}
{2B03333F-3ACD-474C-862B-FA97D3BA03B5} = {DA69F624-5398-4884-87E4-B816698CDE65}
{D1334F29-5C19-4C7B-B62D-0A2F23AFB31C} = {DA69F624-5398-4884-87E4-B816698CDE65}
{A5006087-81B0-4C62-B847-50ED5C37069D} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7945A4E4-ACDB-4F6E-95CA-6AC6E7C2CD59}

View File

@ -17,8 +17,9 @@
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Common\Microsoft.AspNetCore.SignalR.Common.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client.Core\Microsoft.AspNetCore.SignalR.Client.Core.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Protocols.MessagePack\Microsoft.AspNetCore.SignalR.Protocols.MessagePack.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Redis\Microsoft.AspNetCore.SignalR.Redis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.StackExchangeRedis\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj" />
<ProjectReference Include="..\..\test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />
<ProjectReference Include="..\..\test\Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests\Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -10,7 +10,7 @@ using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Redis;
using Microsoft.AspNetCore.SignalR.StackExchangeRedis;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
@ -34,7 +34,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Params(2, 20)]
public int ProtocolCount { get; set; }
[GlobalSetup]
// Re-enable micro-benchmark when https://github.com/aspnet/SignalR/issues/3088 is fixed
// [GlobalSetup]
public void GlobalSetup()
{
var server = new TestRedisServer();
@ -90,7 +91,7 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
_users.Add("EvenUser");
_users.Add("OddUser");
_args = new object[] {"Foo"};
_args = new object[] { "Foo" };
}
private IEnumerable<IHubProtocol> GenerateProtocols(int protocolCount)
@ -111,55 +112,55 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
}
}
[Benchmark]
//[Benchmark]
public async Task SendAll()
{
await _manager1.SendAllAsync("Test", _args);
}
[Benchmark]
//[Benchmark]
public async Task SendGroup()
{
await _manager1.SendGroupAsync("Everyone", "Test", _args);
}
[Benchmark]
//[Benchmark]
public async Task SendUser()
{
await _manager1.SendUserAsync("EvenUser", "Test", _args);
}
[Benchmark]
//[Benchmark]
public async Task SendConnection()
{
await _manager1.SendConnectionAsync(_clients[0].Connection.ConnectionId, "Test", _args);
}
[Benchmark]
//[Benchmark]
public async Task SendConnections()
{
await _manager1.SendConnectionsAsync(_sendIds, "Test", _args);
}
[Benchmark]
//[Benchmark]
public async Task SendAllExcept()
{
await _manager1.SendAllExceptAsync("Test", _args, _excludedConnectionIds);
}
[Benchmark]
//[Benchmark]
public async Task SendGroupExcept()
{
await _manager1.SendGroupExceptAsync("Everyone", "Test", _args, _excludedConnectionIds);
}
[Benchmark]
//[Benchmark]
public async Task SendGroups()
{
await _manager1.SendGroupsAsync(_groups, "Test", _args);
}
[Benchmark]
//[Benchmark]
public async Task SendUsers()
{
await _manager1.SendUsersAsync(_users, "Test", _args);

View File

@ -7,7 +7,7 @@ using System.Collections.Generic;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Redis.Internal;
using Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal;
namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
{

View File

@ -62,6 +62,7 @@
<NETStandardLibrary20PackageVersion>2.0.3</NETStandardLibrary20PackageVersion>
<NewtonsoftJsonPackageVersion>11.0.2</NewtonsoftJsonPackageVersion>
<StackExchangeRedisStrongNamePackageVersion>1.2.6</StackExchangeRedisStrongNamePackageVersion>
<StackExchangeRedisPackageVersion>2.0.513</StackExchangeRedisPackageVersion>
<SystemBuffersPackageVersion>4.6.0-preview1-26907-04</SystemBuffersPackageVersion>
<SystemIOPipelinesPackageVersion>4.6.0-preview1-26907-04</SystemIOPipelinesPackageVersion>
<SystemMemoryPackageVersion>4.6.0-preview1-26717-04</SystemMemoryPackageVersion>

View File

@ -7,7 +7,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
class CallbackMap {
private Map<String, List<InvocationHandler>> handlers = new ConcurrentHashMap<>();
private final Map<String, List<InvocationHandler>> handlers = new ConcurrentHashMap<>();
public InvocationHandler put(String target, ActionBase action, Class<?>... classes) {
InvocationHandler handler = new InvocationHandler(action, classes);

View File

@ -22,8 +22,8 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
class DefaultHttpClient extends HttpClient {
private OkHttpClient client;
final class DefaultHttpClient extends HttpClient {
private final OkHttpClient client;
private Logger logger;
public DefaultHttpClient(Logger logger) {

View File

@ -5,7 +5,7 @@ package com.microsoft.signalr;
import com.google.gson.Gson;
class HandshakeProtocol {
final class HandshakeProtocol {
private static final Gson gson = new Gson();
private static final String RECORD_SEPARATOR = "\u001e";

View File

@ -3,18 +3,18 @@
package com.microsoft.signalr;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.time.Duration;
import io.reactivex.Single;
public class HttpHubConnectionBuilder {
private String url;
private final String url;
private Transport transport;
private Logger logger;
private HttpClient httpClient;
private boolean skipNegotiate;
private Single<String> accessTokenProvider;
private Duration handshakeResponseTimeout;
HttpHubConnectionBuilder(String url) {
this.url = url;
@ -56,7 +56,12 @@ public class HttpHubConnectionBuilder {
return this;
}
public HttpHubConnectionBuilder withHandshakeResponseTimeout(Duration timeout) {
this.handshakeResponseTimeout = timeout;
return this;
}
public HubConnection build() {
return new HubConnection(url, transport, skipNegotiate, logger, httpClient, accessTokenProvider);
return new HubConnection(url, transport, skipNegotiate, logger, httpClient, accessTokenProvider, handshakeResponseTimeout);
}
}

View File

@ -3,13 +3,17 @@
package com.microsoft.signalr;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@ -19,27 +23,56 @@ import io.reactivex.Single;
public class HubConnection {
private static final String RECORD_SEPARATOR = "\u001e";
private static List<Class<?>> emptyArray = new ArrayList<>();
private static int MAX_NEGOTIATE_ATTEMPTS = 100;
private static final List<Class<?>> emptyArray = new ArrayList<>();
private static final int MAX_NEGOTIATE_ATTEMPTS = 100;
private String baseUrl;
private final String baseUrl;
private Transport transport;
private OnReceiveCallBack callback;
private CallbackMap handlers = new CallbackMap();
private final CallbackMap handlers = new CallbackMap();
private HubProtocol protocol;
private Boolean handshakeReceived = false;
private HubConnectionState hubConnectionState = HubConnectionState.DISCONNECTED;
private Lock hubConnectionStateLock = new ReentrantLock();
private final Lock hubConnectionStateLock = new ReentrantLock();
private Logger logger;
private List<Consumer<Exception>> onClosedCallbackList;
private boolean skipNegotiate;
private final boolean skipNegotiate;
private Single<String> accessTokenProvider;
private Map<String, String> headers = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private ConnectionState connectionState = null;
private HttpClient httpClient;
private final HttpClient httpClient;
private String stopError;
private Timer pingTimer = null;
private final AtomicLong nextServerTimeout = new AtomicLong();
private final AtomicLong nextPingActivation = new AtomicLong();
private Duration keepAliveInterval = Duration.ofSeconds(15);
private Duration serverTimeout = Duration.ofSeconds(30);
private Duration tickRate = Duration.ofSeconds(1);
private CompletableFuture<Void> handshakeResponseFuture;
private Duration handshakeResponseTimeout = Duration.ofSeconds(15);
HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single<String> accessTokenProvider) {
public void setServerTimeout(Duration serverTimeout) {
this.serverTimeout = serverTimeout;
}
public Duration getServerTimeout() {
return this.serverTimeout;
}
public void setKeepAliveInterval(Duration keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}
public Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}
// For testing purposes
void setTickRate(Duration tickRate) {
this.tickRate = tickRate;
}
HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single<String> accessTokenProvider, Duration handshakeResponseTimeout) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
}
@ -69,19 +102,34 @@ public class HubConnection {
this.transport = transport;
}
if (handshakeResponseTimeout != null) {
this.handshakeResponseTimeout = handshakeResponseTimeout;
}
this.skipNegotiate = skipNegotiate;
this.callback = (payload) -> {
resetServerTimeout();
if (!handshakeReceived) {
int handshakeLength = payload.indexOf(RECORD_SEPARATOR) + 1;
String handshakeResponseString = payload.substring(0, handshakeLength - 1);
HandshakeResponseMessage handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
HandshakeResponseMessage handshakeResponse;
try {
handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
} catch (RuntimeException ex) {
RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex);
handshakeResponseFuture.completeExceptionally(exception);
throw exception;
}
if (handshakeResponse.getHandshakeError() != null) {
String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError();
logger.log(LogLevel.Error, errorMessage);
throw new RuntimeException(errorMessage);
RuntimeException exception = new RuntimeException(errorMessage);
handshakeResponseFuture.completeExceptionally(exception);
throw exception;
}
handshakeReceived = true;
handshakeResponseFuture.complete(null);
payload = payload.substring(handshakeLength);
// The payload only contained the handshake response so we can return.
@ -134,6 +182,12 @@ public class HubConnection {
};
}
private void timeoutHandshakeResponse(long timeout, TimeUnit unit) {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.schedule(() -> handshakeResponseFuture.completeExceptionally(
new TimeoutException("Timed out waiting for the server to respond to the handshake message.")), timeout, unit);
}
private CompletableFuture<NegotiateResponse> handleNegotiate(String url) {
HttpRequest request = new HttpRequest();
request.addHeaders(this.headers);
@ -142,12 +196,7 @@ public class HubConnection {
if (response.getStatusCode() != 200) {
throw new RuntimeException(String.format("Unexpected status code returned from negotiate: %d %s.", response.getStatusCode(), response.getStatusText()));
}
NegotiateResponse negotiateResponse;
try {
negotiateResponse = new NegotiateResponse(response.getContent());
} catch (IOException e) {
throw new RuntimeException(e);
}
NegotiateResponse negotiateResponse = new NegotiateResponse(response.getContent());
if (negotiateResponse.getError() != null) {
throw new RuntimeException(negotiateResponse.getError());
@ -184,8 +233,9 @@ public class HubConnection {
return Completable.complete();
}
handshakeResponseFuture = new CompletableFuture<>();
handshakeReceived = false;
CompletableFuture<Void> tokenFuture = new CompletableFuture<>();
CompletableFuture<Void> tokenFuture = new CompletableFuture<>();
accessTokenProvider.subscribe(token -> {
if (token != null && !token.isEmpty()) {
this.headers.put("Authorization", "Bearer " + token);
@ -213,15 +263,41 @@ public class HubConnection {
return transport.start(url).thenCompose((future) -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenRun(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
} finally {
hubConnectionStateLock.unlock();
}
return transport.send(handshake).thenCompose((innerFuture) -> {
timeoutHandshakeResponse(handshakeResponseTimeout.toMillis(), TimeUnit.MILLISECONDS);
return handshakeResponseFuture.thenRun(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
resetServerTimeout();
this.pingTimer = new Timer();
this.pingTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (System.currentTimeMillis() > nextServerTimeout.get()) {
stop("Server timeout elapsed without receiving a message from the server.");
return;
}
if (System.currentTimeMillis() > nextPingActivation.get()) {
sendHubMessage(PingMessage.getInstance());
}
} catch (Exception e) {
logger.log(LogLevel.Warning, String.format("Error sending ping: %s", e.getMessage()));
// The connection is probably in a bad or closed state now, cleanup the timer so
// it stops triggering
pingTimer.cancel();
}
}
}, new Date(0), tickRate.toMillis());
} finally {
hubConnectionStateLock.unlock();
}
});
});
});
}));
@ -308,6 +384,7 @@ public class HubConnection {
connectionState = null;
logger.log(LogLevel.Information, "HubConnection stopped.");
hubConnectionState = HubConnectionState.DISCONNECTED;
handshakeResponseFuture.complete(null);
} finally {
hubConnectionStateLock.unlock();
}
@ -326,18 +403,18 @@ public class HubConnection {
*
* @param method The name of the server method to invoke.
* @param args The arguments to be passed to the method.
* @throws Exception If there was an error while sending.
*/
public void send(String method, Object... args) throws Exception {
public void send(String method, Object... args) {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new HubException("The 'send' method cannot be called if the connection is not active");
throw new RuntimeException("The 'send' method cannot be called if the connection is not active");
}
InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
sendHubMessage(invocationMessage);
}
public <T> Single<T> invoke(Class<T> returnType, String method, Object... args) throws Exception {
@SuppressWarnings("unchecked")
public <T> Single<T> invoke(Class<T> returnType, String method, Object... args) {
String id = connectionState.getNextInvocationId();
InvocationMessage invocationMessage = new InvocationMessage(id, method, args);
@ -368,14 +445,24 @@ public class HubConnection {
return Single.fromFuture(future);
}
private void sendHubMessage(HubMessage message) throws Exception {
private void sendHubMessage(HubMessage message) {
String serializedMessage = protocol.writeMessage(message);
if (message.getMessageType() == HubMessageType.INVOCATION) {
logger.log(LogLevel.Debug, "Sending %d message '%s'.", message.getMessageType().value, ((InvocationMessage)message).getInvocationId());
logger.log(LogLevel.Debug, "Sending %s message '%s'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId());
} else {
logger.log(LogLevel.Debug, "Sending %d message.", message.getMessageType().value);
logger.log(LogLevel.Debug, "Sending %s message.", message.getMessageType().name());
}
transport.send(serializedMessage);
resetKeepAlive();
}
private void resetServerTimeout() {
this.nextServerTimeout.set(System.currentTimeMillis() + serverTimeout.toMillis());
}
private void resetKeepAlive() {
this.nextPingActivation.set(System.currentTimeMillis() + keepAliveInterval.toMillis());
}
/**
@ -684,7 +771,7 @@ public class HubConnection {
}
@Override
public List<Class<?>> getParameterTypes(String methodName) throws Exception {
public List<Class<?>> getParameterTypes(String methodName) {
List<InvocationHandler> handlers = connection.handlers.get(methodName);
if (handlers == null) {
logger.log(LogLevel.Warning, "Failed to find handler for '%s' method.", methodName);
@ -692,7 +779,7 @@ public class HubConnection {
}
if (handlers.isEmpty()) {
throw new Exception(String.format("There are no callbacks registered for the method '%s'.", methodName));
throw new RuntimeException(String.format("There are no callbacks registered for the method '%s'.", methodName));
}
return handlers.get(0).getClasses();

View File

@ -4,12 +4,11 @@
package com.microsoft.signalr;
public abstract class HubConnectionBuilder {
public static HttpHubConnectionBuilder create(String url) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
}
return new HttpHubConnectionBuilder(url);
return new HttpHubConnectionBuilder(url);
}
public abstract HubConnection build();

View File

@ -3,7 +3,9 @@
package com.microsoft.signalr;
public class HubException extends Exception {
public class HubException extends RuntimeException {
private static final long serialVersionUID = -572019264269821519L;
public HubException() {
}

View File

@ -16,7 +16,7 @@ interface HubProtocol {
* @param message A string representation of one or more {@link HubMessage}s.
* @return A list of {@link HubMessage}s.
*/
HubMessage[] parseMessages(String message, InvocationBinder binder) throws Exception;
HubMessage[] parseMessages(String message, InvocationBinder binder);
/**
* Writes the specified {@link HubMessage} to a String.

View File

@ -7,5 +7,5 @@ import java.util.List;
interface InvocationBinder {
Class<?> getReturnType(String invocationId);
List<Class<?>> getParameterTypes(String methodName) throws Exception;
List<Class<?>> getParameterTypes(String methodName);
}

View File

@ -7,8 +7,8 @@ import java.util.Arrays;
import java.util.List;
class InvocationHandler {
private List<Class<?>> classes;
private ActionBase action;
private final List<Class<?>> classes;
private final ActionBase action;
InvocationHandler(ActionBase action, Class<?>... classes) {
this.action = action;

View File

@ -6,9 +6,9 @@ package com.microsoft.signalr;
import java.util.concurrent.CompletableFuture;
class InvocationRequest {
private Class<?> returnType;
private CompletableFuture<Object> pendingCall = new CompletableFuture<>();
private String invocationId;
private final Class<?> returnType;
private final CompletableFuture<Object> pendingCall = new CompletableFuture<>();
private final String invocationId;
InvocationRequest(Class<?> returnType, String invocationId) {
this.returnType = returnType;

View File

@ -36,105 +36,109 @@ class JsonHubProtocol implements HubProtocol {
}
@Override
public HubMessage[] parseMessages(String payload, InvocationBinder binder) throws Exception {
public HubMessage[] parseMessages(String payload, InvocationBinder binder) {
if (payload != null && !payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR)) {
throw new RuntimeException("Message is incomplete.");
}
String[] messages = payload.split(RECORD_SEPARATOR);
List<HubMessage> hubMessages = new ArrayList<>();
for (String str : messages) {
HubMessageType messageType = null;
String invocationId = null;
String target = null;
String error = null;
ArrayList<Object> arguments = null;
JsonArray argumentsToken = null;
Object result = null;
JsonElement resultToken = null;
JsonReader reader = new JsonReader(new StringReader(str));
reader.beginObject();
try {
for (String str : messages) {
HubMessageType messageType = null;
String invocationId = null;
String target = null;
String error = null;
ArrayList<Object> arguments = null;
JsonArray argumentsToken = null;
Object result = null;
JsonElement resultToken = null;
JsonReader reader = new JsonReader(new StringReader(str));
reader.beginObject();
do {
String name = reader.nextName();
switch (name) {
case "type":
messageType = HubMessageType.values()[reader.nextInt() - 1];
break;
case "invocationId":
invocationId = reader.nextString();
break;
case "target":
target = reader.nextString();
break;
case "error":
error = reader.nextString();
break;
case "result":
if (invocationId == null) {
resultToken = jsonParser.parse(reader);
} else {
result = gson.fromJson(reader, binder.getReturnType(invocationId));
}
break;
case "item":
reader.skipValue();
break;
case "arguments":
if (target != null) {
do {
String name = reader.nextName();
switch (name) {
case "type":
messageType = HubMessageType.values()[reader.nextInt() - 1];
break;
case "invocationId":
invocationId = reader.nextString();
break;
case "target":
target = reader.nextString();
break;
case "error":
error = reader.nextString();
break;
case "result":
if (invocationId == null) {
resultToken = jsonParser.parse(reader);
} else {
result = gson.fromJson(reader, binder.getReturnType(invocationId));
}
break;
case "item":
reader.skipValue();
break;
case "arguments":
if (target != null) {
List<Class<?>> types = binder.getParameterTypes(target);
arguments = bindArguments(reader, types);
} else {
argumentsToken = (JsonArray)jsonParser.parse(reader);
}
break;
case "headers":
throw new RuntimeException("Headers not implemented yet.");
default:
// Skip unknown property, allows new clients to still work with old protocols
reader.skipValue();
break;
}
} while (reader.hasNext());
reader.endObject();
reader.close();
switch (messageType) {
case INVOCATION:
if (argumentsToken != null) {
List<Class<?>> types = binder.getParameterTypes(target);
arguments = bindArguments(reader, types);
arguments = bindArguments(argumentsToken, types);
}
if (arguments == null) {
hubMessages.add(new InvocationMessage(invocationId, target, new Object[0]));
} else {
argumentsToken = (JsonArray)jsonParser.parse(reader);
hubMessages.add(new InvocationMessage(invocationId, target, arguments.toArray()));
}
break;
case COMPLETION:
if (resultToken != null) {
result = gson.fromJson(resultToken, binder.getReturnType(invocationId));
}
hubMessages.add(new CompletionMessage(invocationId, result, error));
break;
case STREAM_INVOCATION:
case STREAM_ITEM:
case CANCEL_INVOCATION:
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", messageType));
case PING:
hubMessages.add(PingMessage.getInstance());
break;
case CLOSE:
if (error != null) {
hubMessages.add(new CloseMessage(error));
} else {
hubMessages.add(new CloseMessage());
}
break;
case "headers":
throw new HubException("Headers not implemented yet.");
default:
// Skip unknown property, allows new clients to still work with old protocols
reader.skipValue();
break;
}
} while (reader.hasNext());
reader.endObject();
reader.close();
switch (messageType) {
case INVOCATION:
if (argumentsToken != null) {
List<Class<?>> types = binder.getParameterTypes(target);
arguments = bindArguments(argumentsToken, types);
}
if (arguments == null) {
hubMessages.add(new InvocationMessage(invocationId, target, new Object[0]));
} else {
hubMessages.add(new InvocationMessage(invocationId, target, arguments.toArray()));
}
break;
case COMPLETION:
if (resultToken != null) {
result = gson.fromJson(resultToken, binder.getReturnType(invocationId));
}
hubMessages.add(new CompletionMessage(invocationId, result, error));
break;
case STREAM_INVOCATION:
case STREAM_ITEM:
case CANCEL_INVOCATION:
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", messageType));
case PING:
hubMessages.add(PingMessage.getInstance());
break;
case CLOSE:
if (error != null) {
hubMessages.add(new CloseMessage(error));
} else {
hubMessages.add(new CloseMessage());
}
break;
default:
break;
}
} catch (IOException ex) {
throw new RuntimeException("Error reading JSON.", ex);
}
return hubMessages.toArray(new HubMessage[hubMessages.size()]);

View File

@ -17,60 +17,64 @@ class NegotiateResponse {
private String accessToken;
private String error;
public NegotiateResponse(String negotiatePayload) throws IOException {
JsonReader reader = new JsonReader(new StringReader(negotiatePayload));
reader.beginObject();
public NegotiateResponse(String negotiatePayload) {
try {
JsonReader reader = new JsonReader(new StringReader(negotiatePayload));
reader.beginObject();
do {
String name = reader.nextName();
switch (name) {
case "error":
this.error = reader.nextString();
break;
case "url":
this.redirectUrl = reader.nextString();
break;
case "accessToken":
this.accessToken = reader.nextString();
break;
case "availableTransports":
reader.beginArray();
while (reader.hasNext()) {
reader.beginObject();
do {
String name = reader.nextName();
switch (name) {
case "error":
this.error = reader.nextString();
break;
case "url":
this.redirectUrl = reader.nextString();
break;
case "accessToken":
this.accessToken = reader.nextString();
break;
case "availableTransports":
reader.beginArray();
while (reader.hasNext()) {
String transport = null;
String property = reader.nextName();
switch (property) {
case "transport":
transport = reader.nextString();
break;
case "transferFormats":
// transfer formats aren't supported currently
reader.skipValue();
break;
default:
// Skip unknown property, allows new clients to still work with old protocols
reader.skipValue();
break;
reader.beginObject();
while (reader.hasNext()) {
String transport = null;
String property = reader.nextName();
switch (property) {
case "transport":
transport = reader.nextString();
break;
case "transferFormats":
// transfer formats aren't supported currently
reader.skipValue();
break;
default:
// Skip unknown property, allows new clients to still work with old protocols
reader.skipValue();
break;
}
this.availableTransports.add(transport);
}
this.availableTransports.add(transport);
reader.endObject();
}
reader.endObject();
}
reader.endArray();
break;
case "connectionId":
this.connectionId = reader.nextString();
break;
default:
// Skip unknown property, allows new clients to still work with old protocols
reader.skipValue();
break;
}
} while (reader.hasNext());
reader.endArray();
break;
case "connectionId":
this.connectionId = reader.nextString();
break;
default:
// Skip unknown property, allows new clients to still work with old protocols
reader.skipValue();
break;
}
} while (reader.hasNext());
reader.endObject();
reader.close();
reader.endObject();
reader.close();
} catch (IOException ex) {
throw new RuntimeException("Error reading NegotiateResponse", ex);
}
}
public String getConnectionId() {

View File

@ -4,5 +4,5 @@
package com.microsoft.signalr;
interface OnReceiveCallBack {
void invoke(String message) throws Exception;
void invoke(String message);
}

View File

@ -5,6 +5,8 @@ package com.microsoft.signalr;
class PingMessage extends HubMessage
{
private final int type = HubMessageType.PING.value;
private static PingMessage instance = new PingMessage();
private PingMessage()

View File

@ -4,8 +4,7 @@
package com.microsoft.signalr;
class StreamInvocationMessage extends InvocationMessage {
int type = HubMessageType.STREAM_INVOCATION.value;
private final int type = HubMessageType.STREAM_INVOCATION.value;
public StreamInvocationMessage(String invocationId, String target, Object[] arguments) {
super(invocationId, target, arguments);

View File

@ -6,9 +6,9 @@ package com.microsoft.signalr;
import java.util.List;
public class Subscription {
private CallbackMap handlers;
private InvocationHandler handler;
private String target;
private final CallbackMap handlers;
private final InvocationHandler handler;
private final String target;
public Subscription(CallbackMap handlers, InvocationHandler handler, String target) {
this.handlers = handlers;

View File

@ -3,7 +3,7 @@
package com.microsoft.signalr;
public enum TransferFormat {
enum TransferFormat {
TEXT,
BINARY
}

View File

@ -10,7 +10,7 @@ interface Transport {
CompletableFuture<Void> start(String url);
CompletableFuture<Void> send(String message);
void setOnReceive(OnReceiveCallBack callback);
void onReceive(String message) throws Exception;
void onReceive(String message);
void setOnClose(Consumer<String> onCloseCallback);
CompletableFuture<Void> stop();
}

View File

@ -13,8 +13,8 @@ class WebSocketTransport implements Transport {
private Consumer<String> onClose;
private String url;
private Logger logger;
private HttpClient client;
private Map<String, String> headers;
private final HttpClient client;
private final Map<String, String> headers;
private static final String HTTP = "http";
private static final String HTTPS = "https";
@ -68,7 +68,7 @@ class WebSocketTransport implements Transport {
}
@Override
public void onReceive(String message) throws Exception {
public void onReceive(String message) {
this.onReceiveCallBack.invoke(message);
}

View File

@ -29,4 +29,10 @@ class HandshakeProtocolTest {
HandshakeResponseMessage hsr = HandshakeProtocol.parseHandshakeResponse(handshakeResponseWithError);
assertEquals(hsr.getHandshakeError(), "Requested protocol 'messagepack' is not available.");
}
@Test
public void InvalidHandshakeResponse() {
String handshakeResponseWithError = "{\"error\": \"Requested proto";
Throwable exception = assertThrows(RuntimeException.class, ()-> HandshakeProtocol.parseHandshakeResponse(handshakeResponseWithError));
}
}

View File

@ -5,6 +5,7 @@ package com.microsoft.signalr;
import static org.junit.jupiter.api.Assertions.*;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@ -23,7 +24,7 @@ class HubConnectionTest {
private static final String RECORD_SEPARATOR = "\u001e";
@Test
public void checkHubConnectionState() throws Exception {
public void checkHubConnectionState() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -33,7 +34,7 @@ class HubConnectionTest {
}
@Test
public void transportCloseTriggersStopInHubConnection() throws Exception {
public void transportCloseTriggersStopInHubConnection() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
@ -44,7 +45,7 @@ class HubConnectionTest {
}
@Test
public void transportCloseWithErrorTriggersStopInHubConnection() throws Exception {
public void transportCloseWithErrorTriggersStopInHubConnection() {
MockTransport mockTransport = new MockTransport();
AtomicReference<String> message = new AtomicReference<>();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -58,11 +59,26 @@ class HubConnectionTest {
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
mockTransport.stopWithError(errorMessage);
assertEquals(errorMessage, message.get());
}
@Test
public void checkHubConnectionStateNoHandShakeResponse() {
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = HubConnectionBuilder.create("http://example.com")
.withTransport(mockTransport)
.withHttpClient(new TestHttpClient())
.shouldSkipNegotiate(true)
.withHandshakeResponseTimeout(Duration.ofMillis(100))
.build();
Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals(ExecutionException.class, exception.getCause().getClass());
assertEquals(TimeoutException.class, exception.getCause().getCause().getClass());
assertEquals(exception.getCause().getCause().getMessage(), "Timed out waiting for the server to respond to the handshake message.");
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void constructHubConnectionWithHttpConnectionOptions() throws Exception {
public void constructHubConnectionWithHttpConnectionOptions() {
Transport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -74,12 +90,11 @@ class HubConnectionTest {
}
@Test
public void hubConnectionClosesAfterCloseMessage() throws Exception {
public void hubConnectionClosesAfterCloseMessage() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -88,9 +103,21 @@ class HubConnectionTest {
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void invalidHandShakeResponse() {
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
Throwable exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{" + RECORD_SEPARATOR));
assertEquals("An invalid handshake response was received from the server.", exception.getMessage());
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void hubConnectionReceiveHandshakeResponseWithError() {
MockTransport mockTransport = new MockTransport();
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
@ -99,7 +126,7 @@ class HubConnectionTest {
}
@Test
public void registeringMultipleHandlersAndBothGetTriggered() throws Exception {
public void registeringMultipleHandlersAndBothGetTriggered() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -117,7 +144,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -125,7 +151,7 @@ class HubConnectionTest {
}
@Test
public void removeHandlerByName() throws Exception {
public void removeHandlerByName() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -141,7 +167,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -152,7 +177,7 @@ class HubConnectionTest {
}
@Test
public void addAndRemoveHandlerImmediately() throws Exception {
public void addAndRemoveHandlerImmediately() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -169,7 +194,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that the handler was removed.
@ -177,7 +201,7 @@ class HubConnectionTest {
}
@Test
public void removingMultipleHandlersWithOneCallToRemove() throws Exception {
public void removingMultipleHandlersWithOneCallToRemove() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -195,7 +219,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
assertEquals(Double.valueOf(3), value.get());
@ -209,7 +232,7 @@ class HubConnectionTest {
}
@Test
public void removeHandlerWithUnsubscribe() throws Exception {
public void removeHandlerWithUnsubscribe() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -225,7 +248,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -242,7 +264,7 @@ class HubConnectionTest {
}
@Test
public void unsubscribeTwice() throws Exception {
public void unsubscribeTwice() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -258,7 +280,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -276,7 +297,7 @@ class HubConnectionTest {
}
@Test
public void removeSingleHandlerWithUnsubscribe() throws Exception {
public void removeSingleHandlerWithUnsubscribe() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -294,7 +315,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
assertEquals(Double.valueOf(3), value.get());
@ -306,7 +326,7 @@ class HubConnectionTest {
}
@Test
public void addAndRemoveHandlerImmediatelyWithSubscribe() throws Exception {
public void addAndRemoveHandlerImmediatelyWithSubscribe() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -318,7 +338,6 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
try {
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
@ -331,7 +350,7 @@ class HubConnectionTest {
}
@Test
public void registeringMultipleHandlersThatTakeParamsAndBothGetTriggered() throws Exception {
public void registeringMultipleHandlersThatTakeParamsAndBothGetTriggered() {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -343,7 +362,6 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -351,12 +369,11 @@ class HubConnectionTest {
}
@Test
public void invokeWaitsForCompletionMessage() throws Exception {
public void invokeWaitsForCompletionMessage() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
@ -370,12 +387,11 @@ class HubConnectionTest {
}
@Test
public void multipleInvokesWaitForOwnCompletionMessage() throws Exception {
public void multipleInvokesWaitForOwnCompletionMessage() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean doneFirst = new AtomicBoolean();
AtomicBoolean doneSecond = new AtomicBoolean();
@ -397,12 +413,11 @@ class HubConnectionTest {
}
@Test
public void invokeWorksForPrimitiveTypes() throws Exception {
public void invokeWorksForPrimitiveTypes() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
// int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type
@ -417,12 +432,11 @@ class HubConnectionTest {
}
@Test
public void completionMessageCanHaveError() throws Exception {
public void completionMessageCanHaveError() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
@ -443,12 +457,11 @@ class HubConnectionTest {
}
@Test
public void stopCancelsActiveInvokes() throws Exception {
public void stopCancelsActiveInvokes() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
@ -469,7 +482,7 @@ class HubConnectionTest {
}
@Test
public void sendWithNoParamsTriggersOnHandler() throws Exception {
public void sendWithNoParamsTriggersOnHandler() {
AtomicReference<Integer> value = new AtomicReference<>(0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -480,7 +493,6 @@ class HubConnectionTest {
});
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -488,7 +500,7 @@ class HubConnectionTest {
}
@Test
public void sendWithParamTriggersOnHandler() throws Exception {
public void sendWithParamTriggersOnHandler() {
AtomicReference<String> value = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
@ -499,7 +511,6 @@ class HubConnectionTest {
}, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "Hello World");
@ -508,7 +519,7 @@ class HubConnectionTest {
}
@Test
public void sendWithTwoParamsTriggersOnHandler() throws Exception {
public void sendWithTwoParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<Double> value2 = new AtomicReference<>();
@ -524,7 +535,6 @@ class HubConnectionTest {
}, String.class, Double.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\", 12]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "Hello World", 12);
@ -534,7 +544,7 @@ class HubConnectionTest {
}
@Test
public void sendWithThreeParamsTriggersOnHandler() throws Exception {
public void sendWithThreeParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
AtomicReference<String> value3 = new AtomicReference<>();
@ -553,7 +563,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\"]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "A", "B", "C");
@ -564,7 +573,7 @@ class HubConnectionTest {
}
@Test
public void sendWithFourParamsTriggersOnHandler() throws Exception {
public void sendWithFourParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
AtomicReference<String> value3 = new AtomicReference<>();
@ -586,7 +595,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\", \"D\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -597,7 +605,7 @@ class HubConnectionTest {
}
@Test
public void sendWithFiveParamsTriggersOnHandler() throws Exception {
public void sendWithFiveParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
AtomicReference<String> value3 = new AtomicReference<>();
@ -622,7 +630,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12 ]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -634,7 +641,7 @@ class HubConnectionTest {
}
@Test
public void sendWithSixParamsTriggersOnHandler() throws Exception {
public void sendWithSixParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
AtomicReference<String> value3 = new AtomicReference<>();
@ -662,7 +669,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -675,7 +681,7 @@ class HubConnectionTest {
}
@Test
public void sendWithSevenParamsTriggersOnHandler() throws Exception {
public void sendWithSevenParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
AtomicReference<String> value3 = new AtomicReference<>();
@ -706,7 +712,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -720,7 +725,7 @@ class HubConnectionTest {
}
@Test
public void sendWithEightParamsTriggersOnHandler() throws Exception {
public void sendWithEightParamsTriggersOnHandler() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
AtomicReference<String> value3 = new AtomicReference<>();
@ -754,7 +759,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\",\"F\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
assertEquals("A", value1.get());
@ -774,7 +778,7 @@ class HubConnectionTest {
}
@Test
public void sendWithCustomObjectTriggersOnHandler() throws Exception {
public void sendWithCustomObjectTriggersOnHandler() {
AtomicReference<Custom> value1 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
@ -787,7 +791,6 @@ class HubConnectionTest {
}, Custom.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[{\"number\":1,\"str\":\"A\",\"bools\":[true,false]}]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -800,9 +803,9 @@ class HubConnectionTest {
}
@Test
public void receiveHandshakeResponseAndMessage() throws Exception {
public void receiveHandshakeResponseAndMessage() {
AtomicReference<Double> value = new AtomicReference<Double>(0.0);
MockTransport mockTransport = new MockTransport();
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.on("inc", () ->{
@ -822,7 +825,7 @@ class HubConnectionTest {
}
@Test
public void onClosedCallbackRunsWhenStopIsCalled() throws Exception {
public void onClosedCallbackRunsWhenStopIsCalled() {
AtomicReference<String> value1 = new AtomicReference<>();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start();
@ -837,7 +840,7 @@ class HubConnectionTest {
}
@Test
public void multipleOnClosedCallbacksRunWhenStopIsCalled() throws Exception {
public void multipleOnClosedCallbacksRunWhenStopIsCalled() {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
@ -863,14 +866,13 @@ class HubConnectionTest {
}
@Test
public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithError() throws Exception {
public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithError() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.onClosed((ex) -> {
assertEquals(ex.getMessage(), "There was an error");
});
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -880,7 +882,7 @@ class HubConnectionTest {
}
@Test
public void callingStartOnStartedHubConnectionNoOps() throws Exception {
public void callingStartOnStartedHubConnectionNoOps() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -893,16 +895,16 @@ class HubConnectionTest {
}
@Test
public void cannotSendBeforeStart() throws Exception {
public void cannotSendBeforeStart() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
Throwable exception = assertThrows(HubException.class, () -> hubConnection.send("inc"));
Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.send("inc"));
assertEquals("The 'send' method cannot be called if the connection is not active", exception.getMessage());
}
@Test
public void errorWhenReceivingInvokeWithIncorrectArgumentLength() throws Exception {
public void errorWhenReceivingInvokeWithIncorrectArgumentLength() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.on("Send", (s) -> {
@ -910,7 +912,6 @@ class HubConnectionTest {
}, String.class);
Completable startFuture = hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS);
RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR));
@ -937,7 +938,7 @@ class HubConnectionTest {
}
@Test
public void negotiateThatRedirectsForeverFailsAfter100Tries() throws InterruptedException, TimeoutException, Exception {
public void negotiateThatRedirectsForeverFailsAfter100Tries() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}")));
@ -952,13 +953,13 @@ class HubConnectionTest {
}
@Test
public void afterSuccessfulNegotiateConnectsWithTransport() throws InterruptedException, TimeoutException, Exception {
public void afterSuccessfulNegotiateConnectsWithTransport() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "",
"{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
MockTransport transport = new MockTransport();
MockTransport transport = new MockTransport(true);
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(transport)
@ -977,7 +978,7 @@ class HubConnectionTest {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"error\":\"Test error.\"}")));
MockTransport transport = new MockTransport();
MockTransport transport = new MockTransport(true);
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withHttpClient(client)
@ -990,14 +991,14 @@ class HubConnectionTest {
}
@Test
public void negotiateRedirectIsFollowed() throws Exception {
public void negotiateRedirectIsFollowed() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
.on("POST", "http://testexample.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
MockTransport transport = new MockTransport();
MockTransport transport = new MockTransport(true);
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(transport)
@ -1010,8 +1011,7 @@ class HubConnectionTest {
}
@Test
public void accessTokenProviderIsUsedForNegotiate()
throws InterruptedException, ExecutionException, TimeoutException, Exception {
public void accessTokenProviderIsUsedForNegotiate() {
AtomicReference<String> token = new AtomicReference<>();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate",
@ -1022,7 +1022,7 @@ class HubConnectionTest {
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"));
});
MockTransport transport = new MockTransport();
MockTransport transport = new MockTransport(true);
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(transport)
@ -1037,7 +1037,7 @@ class HubConnectionTest {
}
@Test
public void accessTokenProviderIsOverriddenFromRedirectNegotiate() throws Exception {
public void accessTokenProviderIsOverriddenFromRedirectNegotiate() {
AtomicReference<String> token = new AtomicReference<>();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate", (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"newToken\"}")))
@ -1048,7 +1048,7 @@ class HubConnectionTest {
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"));
});
MockTransport transport = new MockTransport();
MockTransport transport = new MockTransport(true);
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(transport)
@ -1064,7 +1064,41 @@ class HubConnectionTest {
}
@Test
public void hubConnectionCanBeStartedAfterBeingStopped() throws Exception {
public void connectionTimesOutIfServerDoesNotSendMessage() throws InterruptedException, ExecutionException, TimeoutException {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.setServerTimeout(Duration.ofMillis(1));
hubConnection.setTickRate(Duration.ofMillis(1));
CompletableFuture<Exception> closedFuture = new CompletableFuture<>();
hubConnection.onClosed((e) -> {
closedFuture.complete(e);
});
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals("Server timeout elapsed without receiving a message from the server.", closedFuture.get(1000, TimeUnit.MILLISECONDS).getMessage());
}
@Test
public void connectionSendsPingsRegularly() throws InterruptedException {
MockTransport mockTransport = new MockTransport(true, false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.setKeepAliveInterval(Duration.ofMillis(1));
hubConnection.setTickRate(Duration.ofMillis(1));
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(100);
hubConnection.stop();
String[] sentMessages = mockTransport.getSentMessages();
assertTrue(sentMessages.length > 1);
for (int i = 1; i < sentMessages.length; i++) {
assertEquals("{\"type\":6}" + RECORD_SEPARATOR, sentMessages[i]);
}
}
@Test
public void hubConnectionCanBeStartedAfterBeingStopped() {
MockTransport transport = new MockTransport();
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
@ -1083,7 +1117,7 @@ class HubConnectionTest {
}
@Test
public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() throws Exception {
public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() {
MockTransport mockTransport = new MockTransport();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate", (req) -> CompletableFuture

View File

@ -39,7 +39,7 @@ class JsonHubProtocolTest {
}
@Test
public void parsePingMessage() throws Exception {
public void parsePingMessage() {
String stringifiedMessage = "{\"type\":6}\u001E";
TestBinder binder = new TestBinder(PingMessage.getInstance());
@ -51,7 +51,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseCloseMessage() throws Exception {
public void parseCloseMessage() {
String stringifiedMessage = "{\"type\":7}\u001E";
TestBinder binder = new TestBinder(new CloseMessage());
@ -69,7 +69,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseCloseMessageWithError() throws Exception {
public void parseCloseMessageWithError() {
String stringifiedMessage = "{\"type\":7,\"error\": \"There was an error\"}\u001E";
TestBinder binder = new TestBinder(new CloseMessage("There was an error"));
@ -87,7 +87,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseSingleMessage() throws Exception {
public void parseSingleMessage() {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":[42]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("1", "test", new Object[] { 42 }));
@ -109,7 +109,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseSingleUnsupportedStreamItemMessage() throws Exception {
public void parseSingleUnsupportedStreamItemMessage() {
String stringifiedMessage = "{\"type\":2,\"Id\":1,\"Item\":42}\u001E";
TestBinder binder = new TestBinder(null);
@ -118,7 +118,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseSingleUnsupportedStreamInvocationMessage() throws Exception {
public void parseSingleUnsupportedStreamInvocationMessage() {
String stringifiedMessage = "{\"type\":4,\"Id\":1,\"target\":\"test\",\"arguments\":[42]}\u001E";
TestBinder binder = new TestBinder(new StreamInvocationMessage("1", "test", new Object[] { 42 }));
@ -127,7 +127,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseSingleUnsupportedCancelInvocationMessage() throws Exception {
public void parseSingleUnsupportedCancelInvocationMessage() {
String stringifiedMessage = "{\"type\":5,\"invocationId\":123}\u001E";
TestBinder binder = new TestBinder(null);
@ -136,7 +136,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseTwoMessages() throws Exception {
public void parseTwoMessages() {
String twoMessages = "{\"type\":1,\"target\":\"one\",\"arguments\":[42]}\u001E{\"type\":1,\"target\":\"two\",\"arguments\":[43]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("1", "one", new Object[] { 42 }));
@ -167,7 +167,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseSingleMessageMutipleArgs() throws Exception {
public void parseSingleMessageMutipleArgs() {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":[42, 24]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("1", "test", new Object[] { 42, 24 }));
@ -186,7 +186,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseMessageWithOutOfOrderProperties() throws Exception {
public void parseMessageWithOutOfOrderProperties() {
String stringifiedMessage = "{\"arguments\":[42, 24],\"type\":1,\"target\":\"test\"}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("1", "test", new Object[] { 42, 24 }));
@ -205,7 +205,7 @@ class JsonHubProtocolTest {
}
@Test
public void parseCompletionMessageWithOutOfOrderProperties() throws Exception {
public void parseCompletionMessageWithOutOfOrderProperties() {
String stringifiedMessage = "{\"type\":3,\"result\":42,\"invocationId\":\"1\"}\u001E";
TestBinder binder = new TestBinder(new CompletionMessage("1", 42, null));
@ -220,7 +220,7 @@ class JsonHubProtocolTest {
}
@Test
public void errorWhileParsingTooManyArgumentsWithOutOfOrderProperties() throws Exception {
public void errorWhileParsingTooManyArgumentsWithOutOfOrderProperties() {
String stringifiedMessage = "{\"arguments\":[42, 24],\"type\":1,\"target\":\"test\"}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage(null, "test", new Object[] { 42 }));
@ -229,7 +229,7 @@ class JsonHubProtocolTest {
}
@Test
public void errorWhileParsingTooManyArguments() throws Exception {
public void errorWhileParsingTooManyArguments() {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":[42, 24]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage(null, "test", new Object[] { 42 }));
@ -238,7 +238,7 @@ class JsonHubProtocolTest {
}
@Test
public void errorWhileParsingTooFewArguments() throws Exception {
public void errorWhileParsingTooFewArguments() {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":[42]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage(null, "test", new Object[] { 42, 24 }));
@ -247,7 +247,7 @@ class JsonHubProtocolTest {
}
@Test
public void errorWhileParsingIncompleteMessage() throws Exception {
public void errorWhileParsingIncompleteMessage() {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":";
TestBinder binder = new TestBinder(new InvocationMessage(null, "test", new Object[] { 42, 24 }));

View File

@ -12,16 +12,42 @@ class MockTransport implements Transport {
private ArrayList<String> sentMessages = new ArrayList<>();
private String url;
private Consumer<String> onClose;
final private boolean ignorePings;
final private boolean autoHandshake;
private static final String RECORD_SEPARATOR = "\u001e";
public MockTransport() {
this(true, true);
}
public MockTransport(boolean autoHandshake) {
this(autoHandshake, true);
}
public MockTransport(boolean autoHandshake, boolean ignorePings) {
this.autoHandshake = autoHandshake;
this.ignorePings = ignorePings;
}
@Override
public CompletableFuture start(String url) {
public CompletableFuture<Void> start(String url) {
this.url = url;
if (autoHandshake) {
try {
onReceiveCallBack.invoke("{}" + RECORD_SEPARATOR);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture send(String message) {
sentMessages.add(message);
public CompletableFuture<Void> send(String message) {
if (!(ignorePings && message.equals("{\"type\":6}" + RECORD_SEPARATOR))) {
sentMessages.add(message);
}
return CompletableFuture.completedFuture(null);
}
@ -31,7 +57,7 @@ class MockTransport implements Transport {
}
@Override
public void onReceive(String message) throws Exception {
public void onReceive(String message) {
this.onReceiveCallBack.invoke(message);
}
@ -41,7 +67,7 @@ class MockTransport implements Transport {
}
@Override
public CompletableFuture stop() {
public CompletableFuture<Void> stop() {
onClose.accept(null);
return CompletableFuture.completedFuture(null);
}
@ -50,7 +76,7 @@ class MockTransport implements Transport {
onClose.accept(errorMessage);
}
public void receiveMessage(String message) throws Exception {
public void receiveMessage(String message) {
this.onReceive(message);
}

View File

@ -12,7 +12,7 @@ import org.junit.jupiter.api.Test;
class NegotiateResponseTest {
@Test
public void VerifyNegotiateResponse() throws IOException {
public void VerifyNegotiateResponse() {
String stringNegotiateResponse = "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" +
"availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}," +
"{\"transport\":\"ServerSentEvents\",\"transferFormats\":[\"Text\"]}," +
@ -27,7 +27,7 @@ class NegotiateResponseTest {
}
@Test
public void VerifyRedirectNegotiateResponse() throws IOException {
public void VerifyRedirectNegotiateResponse() {
String stringNegotiateResponse = "{\"url\":\"www.example.com\"," +
"\"accessToken\":\"some_access_token\"," +
"\"availableTransports\":[]}";
@ -40,7 +40,7 @@ class NegotiateResponseTest {
}
@Test
public void NegotiateResponseIgnoresExtraProperties() throws IOException {
public void NegotiateResponseIgnoresExtraProperties() {
String stringNegotiateResponse = "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\"," +
"\"extra\":\"something\"}";
NegotiateResponse negotiateResponse = new NegotiateResponse(stringNegotiateResponse);
@ -48,7 +48,7 @@ class NegotiateResponseTest {
}
@Test
public void NegotiateResponseIgnoresExtraComplexProperties() throws IOException {
public void NegotiateResponseIgnoresExtraComplexProperties() {
String stringNegotiateResponse = "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\"," +
"\"extra\":[\"something\"]}";
NegotiateResponse negotiateResponse = new NegotiateResponse(stringNegotiateResponse);

View File

@ -5,7 +5,7 @@ package com.microsoft.signalr;
class TestUtils {
static HubConnection createHubConnection(String url) {
return createHubConnection(url, new MockTransport(), new NullLogger(), true, new TestHttpClient());
return createHubConnection(url, new MockTransport(true), new NullLogger(), true, new TestHttpClient());
}
static HubConnection createHubConnection(String url, Transport transport) {

View File

@ -13,7 +13,7 @@ import org.junit.jupiter.api.Test;
class WebSocketTransportTest {
@Test
public void WebsocketThrowsIfItCantConnect() throws Exception {
public void WebsocketThrowsIfItCantConnect() {
Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start("http://www.example.com").get(1, TimeUnit.SECONDS));
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());

View File

@ -10,7 +10,7 @@ import com.microsoft.signalr.HubConnectionBuilder;
import com.microsoft.signalr.LogLevel;
public class Chat {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
System.out.println("Enter the URL of the SignalR Chat you want to join");
Scanner reader = new Scanner(System.in); // Reading from System.in
String input = reader.nextLine();

View File

@ -1,3 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;

View File

@ -1,3 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Redis.Internal
{
public readonly struct RedisGroupCommand

View File

@ -1,5 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal

View File

@ -8,7 +8,6 @@ using System.IO;
using System.Runtime.InteropServices;
using MessagePack;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace Microsoft.AspNetCore.SignalR.Redis.Internal

View File

@ -0,0 +1,117 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
internal class AckHandler : IDisposable
{
private readonly ConcurrentDictionary<int, AckInfo> _acks = new ConcurrentDictionary<int, AckInfo>();
private readonly Timer _timer;
private readonly TimeSpan _ackThreshold = TimeSpan.FromSeconds(30);
private readonly TimeSpan _ackInterval = TimeSpan.FromSeconds(5);
private readonly object _lock = new object();
private bool _disposed;
public AckHandler()
{
// Don't capture the current ExecutionContext and its AsyncLocals onto the timer
bool restoreFlow = false;
try
{
if (!ExecutionContext.IsFlowSuppressed())
{
ExecutionContext.SuppressFlow();
restoreFlow = true;
}
_timer = new Timer(state => ((AckHandler)state).CheckAcks(), state: this, dueTime: _ackInterval, period: _ackInterval);
}
finally
{
// Restore the current ExecutionContext
if (restoreFlow)
{
ExecutionContext.RestoreFlow();
}
}
}
public Task CreateAck(int id)
{
lock (_lock)
{
if (_disposed)
{
return Task.CompletedTask;
}
return _acks.GetOrAdd(id, _ => new AckInfo()).Tcs.Task;
}
}
public void TriggerAck(int id)
{
if (_acks.TryRemove(id, out var ack))
{
ack.Tcs.TrySetResult(null);
}
}
private void CheckAcks()
{
if (_disposed)
{
return;
}
var utcNow = DateTime.UtcNow;
foreach (var pair in _acks)
{
var elapsed = utcNow - pair.Value.Created;
if (elapsed > _ackThreshold)
{
if (_acks.TryRemove(pair.Key, out var ack))
{
ack.Tcs.TrySetCanceled();
}
}
}
}
public void Dispose()
{
lock (_lock)
{
_disposed = true;
_timer.Dispose();
foreach (var pair in _acks)
{
if (_acks.TryRemove(pair.Key, out var ack))
{
ack.Tcs.TrySetCanceled();
}
}
}
}
private class AckInfo
{
public TaskCompletionSource<object> Tcs { get; private set; }
public DateTime Created { get; private set; }
public AckInfo()
{
Created = DateTime.UtcNow;
Tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
}
}

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
// The size of the enum is defined by the protocol. Do not change it. If you need more than 255 items,
// add an additional enum.
public enum GroupAction : byte
{
// These numbers are used by the protocol, do not change them and always use explicit assignment
// when adding new items to this enum. 0 is intentionally omitted
Add = 1,
Remove = 2,
}
}

View File

@ -0,0 +1,68 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using MessagePack;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
internal static class MessagePackUtil
{
public static int ReadArrayHeader(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadArrayHeader(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static int ReadMapHeader(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadMapHeader(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static string ReadString(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadString(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static byte[] ReadBytes(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadBytes(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static int ReadInt32(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadInt32(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
public static byte ReadByte(ref ReadOnlyMemory<byte> data)
{
var arr = GetArray(data);
var val = MessagePackBinary.ReadByte(arr.Array, arr.Offset, out var readSize);
data = data.Slice(readSize);
return val;
}
private static ArraySegment<byte> GetArray(ReadOnlyMemory<byte> data)
{
var isArray = MemoryMarshal.TryGetArray(data, out var array);
Debug.Assert(isArray);
return array;
}
}
}

View File

@ -0,0 +1,75 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Runtime.CompilerServices;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
internal class RedisChannels
{
private readonly string _prefix;
/// <summary>
/// Gets the name of the channel for sending to all connections.
/// </summary>
/// <remarks>
/// The payload on this channel is <see cref="RedisInvocation"/> objects containing
/// invocations to be sent to all connections
/// </remarks>
public string All { get; }
/// <summary>
/// Gets the name of the internal channel for group management messages.
/// </summary>
public string GroupManagement { get; }
public RedisChannels(string prefix)
{
_prefix = prefix;
All = prefix + ":all";
GroupManagement = prefix + ":internal:groups";
}
/// <summary>
/// Gets the name of the channel for sending a message to a specific connection.
/// </summary>
/// <param name="connectionId">The ID of the connection to get the channel for.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string Connection(string connectionId)
{
return _prefix + ":connection:" + connectionId;
}
/// <summary>
/// Gets the name of the channel for sending a message to a named group of connections.
/// </summary>
/// <param name="groupName">The name of the group to get the channel for.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string Group(string groupName)
{
return _prefix + ":group:" + groupName;
}
/// <summary>
/// Gets the name of the channel for sending a message to all collections associated with a user.
/// </summary>
/// <param name="userId">The ID of the user to get the channel for.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string User(string userId)
{
return _prefix + ":user:" + userId;
}
/// <summary>
/// Gets the name of the acknowledgement channel for the specified server.
/// </summary>
/// <param name="serverName">The name of the server to get the acknowledgement channel for.</param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string Ack(string serverName)
{
return _prefix + ":internal:ack:" + serverName;
}
}
}

View File

@ -0,0 +1,42 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
public readonly struct RedisGroupCommand
{
/// <summary>
/// Gets the ID of the group command.
/// </summary>
public int Id { get; }
/// <summary>
/// Gets the name of the server that sent the command.
/// </summary>
public string ServerName { get; }
/// <summary>
/// Gets the action to be performed on the group.
/// </summary>
public GroupAction Action { get; }
/// <summary>
/// Gets the group on which the action is performed.
/// </summary>
public string GroupName { get; }
/// <summary>
/// Gets the ID of the connection to be added or removed from the group.
/// </summary>
public string ConnectionId { get; }
public RedisGroupCommand(int id, string serverName, GroupAction action, string groupName, string connectionId)
{
Id = id;
ServerName = serverName;
Action = action;
GroupName = groupName;
ConnectionId = connectionId;
}
}
}

View File

@ -0,0 +1,35 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
public readonly struct RedisInvocation
{
/// <summary>
/// Gets a list of connections that should be excluded from this invocation.
/// May be null to indicate that no connections are to be excluded.
/// </summary>
public IReadOnlyList<string> ExcludedConnectionIds { get; }
/// <summary>
/// Gets the message serialization cache containing serialized payloads for the message.
/// </summary>
public SerializedHubMessage Message { get; }
public RedisInvocation(SerializedHubMessage message, IReadOnlyList<string> excludedConnectionIds)
{
Message = message;
ExcludedConnectionIds = excludedConnectionIds;
}
public static RedisInvocation Create(string target, object[] arguments, IReadOnlyList<string> excludedConnectionIds = null)
{
return new RedisInvocation(
new SerializedHubMessage(new InvocationMessage(target, null, arguments)),
excludedConnectionIds);
}
}
}

View File

@ -0,0 +1,119 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
// We don't want to use our nested static class here because RedisHubLifetimeManager is generic.
// We'd end up creating separate instances of all the LoggerMessage.Define values for each Hub.
internal static class RedisLog
{
private static readonly Action<ILogger, string, string, Exception> _connectingToEndpoints =
LoggerMessage.Define<string, string>(LogLevel.Information, new EventId(1, "ConnectingToEndpoints"), "Connecting to Redis endpoints: {Endpoints}. Using Server Name: {ServerName}");
private static readonly Action<ILogger, Exception> _connected =
LoggerMessage.Define(LogLevel.Information, new EventId(2, "Connected"), "Connected to Redis.");
private static readonly Action<ILogger, string, Exception> _subscribing =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(3, "Subscribing"), "Subscribing to channel: {Channel}.");
private static readonly Action<ILogger, string, Exception> _receivedFromChannel =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(4, "ReceivedFromChannel"), "Received message from Redis channel {Channel}.");
private static readonly Action<ILogger, string, Exception> _publishToChannel =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(5, "PublishToChannel"), "Publishing message to Redis channel {Channel}.");
private static readonly Action<ILogger, string, Exception> _unsubscribe =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(6, "Unsubscribe"), "Unsubscribing from channel: {Channel}.");
private static readonly Action<ILogger, Exception> _notConnected =
LoggerMessage.Define(LogLevel.Error, new EventId(7, "Connected"), "Not connected to Redis.");
private static readonly Action<ILogger, Exception> _connectionRestored =
LoggerMessage.Define(LogLevel.Information, new EventId(8, "ConnectionRestored"), "Connection to Redis restored.");
private static readonly Action<ILogger, Exception> _connectionFailed =
LoggerMessage.Define(LogLevel.Error, new EventId(9, "ConnectionFailed"), "Connection to Redis failed.");
private static readonly Action<ILogger, Exception> _failedWritingMessage =
LoggerMessage.Define(LogLevel.Warning, new EventId(10, "FailedWritingMessage"), "Failed writing message.");
private static readonly Action<ILogger, Exception> _internalMessageFailed =
LoggerMessage.Define(LogLevel.Warning, new EventId(11, "InternalMessageFailed"), "Error processing message for internal server message.");
public static void ConnectingToEndpoints(ILogger logger, EndPointCollection endpoints, string serverName)
{
if (logger.IsEnabled(LogLevel.Information))
{
if (endpoints.Count > 0)
{
_connectingToEndpoints(logger, string.Join(", ", endpoints.Select(e => EndPointCollection.ToString(e))), serverName, null);
}
}
}
public static void Connected(ILogger logger)
{
_connected(logger, null);
}
public static void Subscribing(ILogger logger, string channelName)
{
_subscribing(logger, channelName, null);
}
public static void ReceivedFromChannel(ILogger logger, string channelName)
{
_receivedFromChannel(logger, channelName, null);
}
public static void PublishToChannel(ILogger logger, string channelName)
{
_publishToChannel(logger, channelName, null);
}
public static void Unsubscribe(ILogger logger, string channelName)
{
_unsubscribe(logger, channelName, null);
}
public static void NotConnected(ILogger logger)
{
_notConnected(logger, null);
}
public static void ConnectionRestored(ILogger logger)
{
_connectionRestored(logger, null);
}
public static void ConnectionFailed(ILogger logger, Exception exception)
{
_connectionFailed(logger, exception);
}
public static void FailedWritingMessage(ILogger logger, Exception exception)
{
_failedWritingMessage(logger, exception);
}
public static void InternalMessageFailed(ILogger logger, Exception exception)
{
_internalMessageFailed(logger, exception);
}
// This isn't DefineMessage-based because it's just the simple TextWriter logging from ConnectionMultiplexer
public static void ConnectionMultiplexerMessage(ILogger logger, string message)
{
if (logger.IsEnabled(LogLevel.Debug))
{
// We tag it with EventId 100 though so it can be pulled out of logs easily.
logger.LogDebug(new EventId(100, "RedisConnectionLog"), message);
}
}
}
}

View File

@ -0,0 +1,208 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using MessagePack;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
public class RedisProtocol
{
private readonly IReadOnlyList<IHubProtocol> _protocols;
public RedisProtocol(IReadOnlyList<IHubProtocol> protocols)
{
_protocols = protocols;
}
// The Redis Protocol:
// * The message type is known in advance because messages are sent to different channels based on type
// * Invocations are sent to the All, Group, Connection and User channels
// * Group Commands are sent to the GroupManagement channel
// * Acks are sent to the Acknowledgement channel.
// * See the Write[type] methods for a description of the protocol for each in-depth.
// * The "Variable length integer" is the length-prefixing format used by BinaryReader/BinaryWriter:
// * https://docs.microsoft.com/en-us/dotnet/api/system.io.binarywriter.write?view=netstandard-2.0
// * The "Length prefixed string" is the string format used by BinaryReader/BinaryWriter:
// * A 7-bit variable length integer encodes the length in bytes, followed by the encoded string in UTF-8.
public byte[] WriteInvocation(string methodName, object[] args) =>
WriteInvocation(methodName, args, excludedConnectionIds: null);
public byte[] WriteInvocation(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
{
// Written as a MessagePack 'arr' containing at least these items:
// * A MessagePack 'arr' of 'str's representing the excluded ids
// * [The output of WriteSerializedHubMessage, which is an 'arr']
// Any additional items are discarded.
var writer = MemoryBufferWriter.Get();
try
{
MessagePackBinary.WriteArrayHeader(writer, 2);
if (excludedConnectionIds != null && excludedConnectionIds.Count > 0)
{
MessagePackBinary.WriteArrayHeader(writer, excludedConnectionIds.Count);
foreach (var id in excludedConnectionIds)
{
MessagePackBinary.WriteString(writer, id);
}
}
else
{
MessagePackBinary.WriteArrayHeader(writer, 0);
}
WriteSerializedHubMessage(writer,
new SerializedHubMessage(new InvocationMessage(methodName, args)));
return writer.ToArray();
}
finally
{
MemoryBufferWriter.Return(writer);
}
}
public byte[] WriteGroupCommand(RedisGroupCommand command)
{
// Written as a MessagePack 'arr' containing at least these items:
// * An 'int': the Id of the command
// * A 'str': The server name
// * An 'int': The action (likely less than 0x7F and thus a single-byte fixnum)
// * A 'str': The group name
// * A 'str': The connection Id
// Any additional items are discarded.
var writer = MemoryBufferWriter.Get();
try
{
MessagePackBinary.WriteArrayHeader(writer, 5);
MessagePackBinary.WriteInt32(writer, command.Id);
MessagePackBinary.WriteString(writer, command.ServerName);
MessagePackBinary.WriteByte(writer, (byte)command.Action);
MessagePackBinary.WriteString(writer, command.GroupName);
MessagePackBinary.WriteString(writer, command.ConnectionId);
return writer.ToArray();
}
finally
{
MemoryBufferWriter.Return(writer);
}
}
public byte[] WriteAck(int messageId)
{
// Written as a MessagePack 'arr' containing at least these items:
// * An 'int': The Id of the command being acknowledged.
// Any additional items are discarded.
var writer = MemoryBufferWriter.Get();
try
{
MessagePackBinary.WriteArrayHeader(writer, 1);
MessagePackBinary.WriteInt32(writer, messageId);
return writer.ToArray();
}
finally
{
MemoryBufferWriter.Return(writer);
}
}
public RedisInvocation ReadInvocation(ReadOnlyMemory<byte> data)
{
// See WriteInvocation for the format
ValidateArraySize(ref data, 2, "Invocation");
// Read excluded Ids
IReadOnlyList<string> excludedConnectionIds = null;
var idCount = MessagePackUtil.ReadArrayHeader(ref data);
if (idCount > 0)
{
var ids = new string[idCount];
for (var i = 0; i < idCount; i++)
{
ids[i] = MessagePackUtil.ReadString(ref data);
}
excludedConnectionIds = ids;
}
// Read payload
var message = ReadSerializedHubMessage(ref data);
return new RedisInvocation(message, excludedConnectionIds);
}
public RedisGroupCommand ReadGroupCommand(ReadOnlyMemory<byte> data)
{
// See WriteGroupCommand for format.
ValidateArraySize(ref data, 5, "GroupCommand");
var id = MessagePackUtil.ReadInt32(ref data);
var serverName = MessagePackUtil.ReadString(ref data);
var action = (GroupAction)MessagePackUtil.ReadByte(ref data);
var groupName = MessagePackUtil.ReadString(ref data);
var connectionId = MessagePackUtil.ReadString(ref data);
return new RedisGroupCommand(id, serverName, action, groupName, connectionId);
}
public int ReadAck(ReadOnlyMemory<byte> data)
{
// See WriteAck for format
ValidateArraySize(ref data, 1, "Ack");
return MessagePackUtil.ReadInt32(ref data);
}
private void WriteSerializedHubMessage(Stream stream, SerializedHubMessage message)
{
// Written as a MessagePack 'map' where the keys are the name of the protocol (as a MessagePack 'str')
// and the values are the serialized blob (as a MessagePack 'bin').
MessagePackBinary.WriteMapHeader(stream, _protocols.Count);
foreach (var protocol in _protocols)
{
MessagePackBinary.WriteString(stream, protocol.Name);
var serialized = message.GetSerializedMessage(protocol);
var isArray = MemoryMarshal.TryGetArray(serialized, out var array);
Debug.Assert(isArray);
MessagePackBinary.WriteBytes(stream, array.Array, array.Offset, array.Count);
}
}
public static SerializedHubMessage ReadSerializedHubMessage(ref ReadOnlyMemory<byte> data)
{
var count = MessagePackUtil.ReadMapHeader(ref data);
var serializations = new SerializedMessage[count];
for (var i = 0; i < count; i++)
{
var protocol = MessagePackUtil.ReadString(ref data);
var serialized = MessagePackUtil.ReadBytes(ref data);
serializations[i] = new SerializedMessage(protocol, serialized);
}
return new SerializedHubMessage(serializations);
}
private static void ValidateArraySize(ref ReadOnlyMemory<byte> data, int expectedLength, string messageType)
{
var length = MessagePackUtil.ReadArrayHeader(ref data);
if (length < expectedLength)
{
throw new InvalidDataException($"Insufficient items in {messageType} array.");
}
}
}
}

View File

@ -0,0 +1,63 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal
{
internal class RedisSubscriptionManager
{
private readonly ConcurrentDictionary<string, HubConnectionStore> _subscriptions = new ConcurrentDictionary<string, HubConnectionStore>(StringComparer.Ordinal);
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
public async Task AddSubscriptionAsync(string id, HubConnectionContext connection, Func<string, HubConnectionStore, Task> subscribeMethod)
{
await _lock.WaitAsync();
try
{
var subscription = _subscriptions.GetOrAdd(id, _ => new HubConnectionStore());
subscription.Add(connection);
// Subscribe once
if (subscription.Count == 1)
{
await subscribeMethod(id, subscription);
}
}
finally
{
_lock.Release();
}
}
public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connection, Func<string, Task> unsubscribeMethod)
{
await _lock.WaitAsync();
try
{
if (!_subscriptions.TryGetValue(id, out var subscription))
{
return;
}
subscription.Remove(connection);
if (subscription.Count == 0)
{
_subscriptions.TryRemove(id, out _);
await unsubscribeMethod(id);
}
}
finally
{
_lock.Release();
}
}
}
}

View File

@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Provides scale-out support for ASP.NET Core SignalR using a Redis server and the StackExchange.Redis client.</Description>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Common\JsonUtils.cs" Link="Internal\JsonUtils.cs" />
<Compile Include="..\Common\MemoryBufferWriter.cs" Link="Internal\MemoryBufferWriter.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options" Version="$(MicrosoftExtensionsOptionsPackageVersion)" />
<PackageReference Include="StackExchange.Redis" Version="$(StackExchangeRedisPackageVersion)" />
<PackageReference Include="MessagePack" Version="$(MessagePackPackageVersion)" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Core\Microsoft.AspNetCore.SignalR.Core.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,69 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.StackExchangeRedis;
using StackExchange.Redis;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Extension methods for configuring Redis-based scale-out for a SignalR Server in an <see cref="ISignalRServerBuilder" />.
/// </summary>
public static class StackExchangeRedisDependencyInjectionExtensions
{
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddStackExchangeRedis(this ISignalRServerBuilder signalrBuilder)
{
return AddStackExchangeRedis(signalrBuilder, o => { });
}
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <param name="redisConnectionString">The connection string used to connect to the Redis server.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddStackExchangeRedis(this ISignalRServerBuilder signalrBuilder, string redisConnectionString)
{
return AddStackExchangeRedis(signalrBuilder, o =>
{
o.Configuration = ConfigurationOptions.Parse(redisConnectionString);
});
}
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <param name="configure">A callback to configure the Redis options.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddStackExchangeRedis(this ISignalRServerBuilder signalrBuilder, Action<RedisOptions> configure)
{
signalrBuilder.Services.Configure(configure);
signalrBuilder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(RedisHubLifetimeManager<>));
return signalrBuilder;
}
/// <summary>
/// Adds scale-out to a <see cref="ISignalRServerBuilder"/>, using a shared Redis server.
/// </summary>
/// <param name="signalrBuilder">The <see cref="ISignalRServerBuilder"/>.</param>
/// <param name="redisConnectionString">The connection string used to connect to the Redis server.</param>
/// <param name="configure">A callback to configure the Redis options.</param>
/// <returns>The same instance of the <see cref="ISignalRServerBuilder"/> for chaining.</returns>
public static ISignalRServerBuilder AddStackExchangeRedis(this ISignalRServerBuilder signalrBuilder, string redisConnectionString, Action<RedisOptions> configure)
{
return AddStackExchangeRedis(signalrBuilder, o =>
{
o.Configuration = ConfigurationOptions.Parse(redisConnectionString);
configure(o);
});
}
}
}

View File

@ -0,0 +1,593 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis
{
public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposable where THub : Hub
{
private readonly HubConnectionStore _connections = new HubConnectionStore();
private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager();
private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager();
private IConnectionMultiplexer _redisServerConnection;
private ISubscriber _bus;
private readonly ILogger _logger;
private readonly RedisOptions _options;
private readonly RedisChannels _channels;
private readonly string _serverName = GenerateServerName();
private readonly RedisProtocol _protocol;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);
private readonly AckHandler _ackHandler;
private int _internalId;
public RedisHubLifetimeManager(ILogger<RedisHubLifetimeManager<THub>> logger,
IOptions<RedisOptions> options,
IHubProtocolResolver hubProtocolResolver)
{
_logger = logger;
_options = options.Value;
_ackHandler = new AckHandler();
_channels = new RedisChannels(typeof(THub).FullName);
_protocol = new RedisProtocol(hubProtocolResolver.AllProtocols);
RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName);
_ = EnsureRedisServerConnection();
}
public override async Task OnConnectedAsync(HubConnectionContext connection)
{
await EnsureRedisServerConnection();
var feature = new RedisFeature();
connection.Features.Set<IRedisFeature>(feature);
var connectionTask = Task.CompletedTask;
var userTask = Task.CompletedTask;
_connections.Add(connection);
connectionTask = SubscribeToConnection(connection);
if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
userTask = SubscribeToUser(connection);
}
await Task.WhenAll(connectionTask, userTask);
}
public override Task OnDisconnectedAsync(HubConnectionContext connection)
{
_connections.Remove(connection);
var tasks = new List<Task>();
var connectionChannel = _channels.Connection(connection.ConnectionId);
RedisLog.Unsubscribe(_logger, connectionChannel);
tasks.Add(_bus.UnsubscribeAsync(connectionChannel));
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
if (groupNames != null)
{
// Copy the groups to an array here because they get removed from this collection
// in RemoveFromGroupAsync
foreach (var group in groupNames.ToArray())
{
// Use RemoveGroupAsyncCore because the connection is local and we don't want to
// accidentally go to other servers with our remove request.
tasks.Add(RemoveGroupAsyncCore(connection, group));
}
}
if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
tasks.Add(RemoveUserAsync(connection));
}
return Task.WhenAll(tasks);
}
public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.All, message);
}
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.All, message);
}
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
// If the connection is local we can skip sending the message through the bus since we require sticky connections.
// This also saves serializing and deserializing the message!
var connection = _connections[connectionId];
if (connection != null)
{
return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask();
}
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.Connection(connectionId), message);
}
public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.User(userId), message);
}
public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
return AddGroupAsyncCore(connection, groupName);
}
return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add);
}
public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
return RemoveGroupAsyncCore(connection, groupName);
}
return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove);
}
public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionIds == null)
{
throw new ArgumentNullException(nameof(connectionIds));
}
var publishTasks = new List<Task>(connectionIds.Count);
var payload = _protocol.WriteInvocation(methodName, args);
foreach (var connectionId in connectionIds)
{
publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload));
}
return Task.WhenAll(publishTasks);
}
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupNames == null)
{
throw new ArgumentNullException(nameof(groupNames));
}
var publishTasks = new List<Task>(groupNames.Count);
var payload = _protocol.WriteInvocation(methodName, args);
foreach (var groupName in groupNames)
{
if (!string.IsNullOrEmpty(groupName))
{
publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
}
}
return Task.WhenAll(publishTasks);
}
public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (userIds.Count > 0)
{
var payload = _protocol.WriteInvocation(methodName, args);
var publishTasks = new List<Task>(userIds.Count);
foreach (var userId in userIds)
{
if (!string.IsNullOrEmpty(userId))
{
publishTasks.Add(PublishAsync(_channels.User(userId), payload));
}
}
return Task.WhenAll(publishTasks);
}
return Task.CompletedTask;
}
private async Task PublishAsync(string channel, byte[] payload)
{
await EnsureRedisServerConnection();
RedisLog.PublishToChannel(_logger, channel);
await _bus.PublishAsync(channel, payload);
}
private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
lock (groupNames)
{
// Connection already in group
if (!groupNames.Add(groupName))
{
return Task.CompletedTask;
}
}
var groupChannel = _channels.Group(groupName);
return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync);
}
/// <summary>
/// This takes <see cref="HubConnectionContext"/> because we want to remove the connection from the
/// _connections list in OnDisconnectedAsync and still be able to remove groups with this method.
/// </summary>
private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var groupChannel = _channels.Group(groupName);
await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName =>
{
RedisLog.Unsubscribe(_logger, channelName);
return _bus.UnsubscribeAsync(channelName);
});
var feature = connection.Features.Get<IRedisFeature>();
var groupNames = feature.Groups;
if (groupNames != null)
{
lock (groupNames)
{
groupNames.Remove(groupName);
}
}
}
private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action)
{
var id = Interlocked.Increment(ref _internalId);
var ack = _ackHandler.CreateAck(id);
// Send Add/Remove Group to other servers and wait for an ack or timeout
var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId));
await PublishAsync(_channels.GroupManagement, message);
await ack;
}
private Task RemoveUserAsync(HubConnectionContext connection)
{
var userChannel = _channels.User(connection.UserIdentifier);
return _users.RemoveSubscriptionAsync(userChannel, connection, channelName =>
{
RedisLog.Unsubscribe(_logger, channelName);
return _bus.UnsubscribeAsync(channelName);
});
}
public void Dispose()
{
_bus?.UnsubscribeAll();
_redisServerConnection?.Dispose();
_ackHandler.Dispose();
}
private async Task SubscribeToAll()
{
RedisLog.Subscribing(_logger, _channels.All);
var channel = await _bus.SubscribeAsync(_channels.All);
channel.OnMessage(async channelMessage =>
{
try
{
RedisLog.ReceivedFromChannel(_logger, _channels.All);
var invocation = _protocol.ReadInvocation((byte[])channelMessage.Message);
var tasks = new List<Task>(_connections.Count);
foreach (var connection in _connections)
{
if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId))
{
tasks.Add(connection.WriteAsync(invocation.Message).AsTask());
}
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
}
private async Task SubscribeToGroupManagementChannel()
{
var channel = await _bus.SubscribeAsync(_channels.GroupManagement);
channel.OnMessage(async channelMessage =>
{
try
{
var groupMessage = _protocol.ReadGroupCommand((byte[])channelMessage.Message);
var connection = _connections[groupMessage.ConnectionId];
if (connection == null)
{
// user not on this server
return;
}
if (groupMessage.Action == GroupAction.Remove)
{
await RemoveGroupAsyncCore(connection, groupMessage.GroupName);
}
if (groupMessage.Action == GroupAction.Add)
{
await AddGroupAsyncCore(connection, groupMessage.GroupName);
}
// Send an ack to the server that sent the original command.
await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id));
}
catch (Exception ex)
{
RedisLog.InternalMessageFailed(_logger, ex);
}
});
}
private async Task SubscribeToAckChannel()
{
// Create server specific channel in order to send an ack to a single server
var channel = await _bus.SubscribeAsync(_channels.Ack(_serverName));
channel.OnMessage(channelMessage =>
{
var ackId = _protocol.ReadAck((byte[])channelMessage.Message);
_ackHandler.TriggerAck(ackId);
});
}
private async Task SubscribeToConnection(HubConnectionContext connection)
{
var connectionChannel = _channels.Connection(connection.ConnectionId);
RedisLog.Subscribing(_logger, connectionChannel);
var channel = await _bus.SubscribeAsync(connectionChannel);
channel.OnMessage(channelMessage =>
{
var invocation = _protocol.ReadInvocation((byte[])channelMessage.Message);
return connection.WriteAsync(invocation.Message).AsTask();
});
}
private Task SubscribeToUser(HubConnectionContext connection)
{
var userChannel = _channels.User(connection.UserIdentifier);
return _users.AddSubscriptionAsync(userChannel, connection, async (channelName, subscriptions) =>
{
RedisLog.Subscribing(_logger, channelName);
var channel = await _bus.SubscribeAsync(channelName);
channel.OnMessage(async channelMessage =>
{
try
{
var invocation = _protocol.ReadInvocation((byte[])channelMessage.Message);
var tasks = new List<Task>();
foreach (var userConnection in subscriptions)
{
tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask());
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
});
}
private async Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections)
{
RedisLog.Subscribing(_logger, groupChannel);
var channel = await _bus.SubscribeAsync(groupChannel);
channel.OnMessage(async (channelMessage) =>
{
try
{
var invocation = _protocol.ReadInvocation((byte[])channelMessage.Message);
var tasks = new List<Task>();
foreach (var groupConnection in groupConnections)
{
if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true)
{
continue;
}
tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask());
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
}
private async Task EnsureRedisServerConnection()
{
if (_redisServerConnection == null)
{
await _connectionLock.WaitAsync();
try
{
if (_redisServerConnection == null)
{
var writer = new LoggerTextWriter(_logger);
_redisServerConnection = await _options.ConnectAsync(writer);
_bus = _redisServerConnection.GetSubscriber();
_redisServerConnection.ConnectionRestored += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionRestored(_logger);
};
_redisServerConnection.ConnectionFailed += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionFailed(_logger, e.Exception);
};
if (_redisServerConnection.IsConnected)
{
RedisLog.Connected(_logger);
}
else
{
RedisLog.NotConnected(_logger);
}
await SubscribeToAll();
await SubscribeToGroupManagementChannel();
await SubscribeToAckChannel();
}
}
finally
{
_connectionLock.Release();
}
}
}
private static string GenerateServerName()
{
// Use the machine name for convenient diagnostics, but add a guid to make it unique.
// Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29
return $"{Environment.MachineName}_{Guid.NewGuid():N}";
}
private class LoggerTextWriter : TextWriter
{
private readonly ILogger _logger;
public LoggerTextWriter(ILogger logger)
{
_logger = logger;
}
public override Encoding Encoding => Encoding.UTF8;
public override void Write(char value)
{
}
public override void WriteLine(string value)
{
RedisLog.ConnectionMultiplexerMessage(_logger, value);
}
}
private interface IRedisFeature
{
HashSet<string> Groups { get; }
}
private class RedisFeature : IRedisFeature
{
public HashSet<string> Groups { get; } = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
}
}
}

View File

@ -0,0 +1,50 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis
{
/// <summary>
/// Options used to configure <see cref="RedisHubLifetimeManager{THub}"/>.
/// </summary>
public class RedisOptions
{
/// <summary>
/// Gets or sets configuration options exposed by <c>StackExchange.Redis</c>.
/// </summary>
public ConfigurationOptions Configuration { get; set; } = new ConfigurationOptions
{
// Enable reconnecting by default
AbortOnConnectFail = false
};
/// <summary>
/// Gets or sets the Redis connection factory.
/// </summary>
public Func<TextWriter, Task<IConnectionMultiplexer>> ConnectionFactory { get; set; }
internal async Task<IConnectionMultiplexer> ConnectAsync(TextWriter log)
{
// Factory is publically settable. Assigning to a local variable before null check for thread safety.
var factory = ConnectionFactory;
if (factory == null)
{
// REVIEW: Should we do this?
if (Configuration.EndPoints.Count == 0)
{
Configuration.EndPoints.Add(IPAddress.Loopback, 0);
Configuration.SetDefaultPorts();
}
return await ConnectionMultiplexer.ConnectAsync(Configuration, log);
}
return await factory(log);
}
}
}

View File

@ -15,8 +15,8 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
private static readonly string _exeSuffix = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty;
private static readonly string _dockerContainerName = "redisTestContainer";
private static readonly string _dockerMonitorContainerName = _dockerContainerName + "Monitor";
private static readonly string _dockerContainerName = "redisTestContainer-1x";
private static readonly string _dockerMonitorContainerName = _dockerContainerName + "Monitor-1x";
private static readonly Lazy<Docker> _instance = new Lazy<Docker>(Create);
public static Docker Default => _instance.Value;
@ -82,7 +82,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
// use static name 'redisTestContainer' so if the container doesn't get removed we don't keep adding more
// use redis base docker image
// 20 second timeout to allow redis image to be downloaded, should be a rare occurrence, only happening when a new version is released
RunProcessAndThrowIfFailed(_path, $"run --rm -p 6379:6379 --name {_dockerContainerName} -d redis", "redis", logger, TimeSpan.FromSeconds(20));
RunProcessAndThrowIfFailed(_path, $"run --rm -p 6380:6379 --name {_dockerContainerName} -d redis", "redis", logger, TimeSpan.FromSeconds(20));
// inspect the redis docker image and extract the IPAddress. Necessary when running tests from inside a docker container, spinning up a new docker container for redis
// outside the current container requires linking the networks (difficult to automate) or using the IP:Port combo
@ -90,7 +90,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
output = output.Trim().Replace(Environment.NewLine, "");
// variable used by Startup.cs
Environment.SetEnvironmentVariable("REDIS_CONNECTION", $"{output}:6379");
Environment.SetEnvironmentVariable("REDIS_CONNECTION-PREV", $"{output}:6379");
var (monitorProcess, monitorOutput) = RunProcess(_path, $"run -i --name {_dockerMonitorContainerName} --link {_dockerContainerName}:redis --rm redis redis-cli -h redis -p 6379", "redis monitor", logger);
monitorProcess.StandardInput.WriteLine("MONITOR");

View File

@ -1,3 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;

View File

@ -22,7 +22,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
{
// We start the servers before starting redis so we want to time them out ASAP
options.Configuration.ConnectTimeout = 1;
options.Configuration.EndPoints.Add(Environment.GetEnvironmentVariable("REDIS_CONNECTION"));
options.Configuration.EndPoints.Add(Environment.GetEnvironmentVariable("REDIS_CONNECTION-PREV"));
});
services.AddSingleton<IUserIdProvider, UserNameIdProvider>();

View File

@ -0,0 +1,192 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
public class Docker
{
private static readonly string _exeSuffix = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty;
private static readonly string _dockerContainerName = "redisTestContainer";
private static readonly string _dockerMonitorContainerName = _dockerContainerName + "Monitor";
private static readonly Lazy<Docker> _instance = new Lazy<Docker>(Create);
public static Docker Default => _instance.Value;
private readonly string _path;
public Docker(string path)
{
_path = path;
}
private static Docker Create()
{
var location = GetDockerLocation();
if (location == null)
{
return null;
}
var docker = new Docker(location);
docker.RunCommand("info --format '{{.OSType}}'", "docker info", out var output);
if (!string.Equals(output.Trim('\'', '"', '\r', '\n', ' '), "linux"))
{
Console.WriteLine($"'docker info' output: {output}");
return null;
}
return docker;
}
private static string GetDockerLocation()
{
// OSX + Docker + Redis don't play well together for some reason. We already have these tests covered on Linux and Windows
// So we are happy ignoring them on OSX
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
return null;
}
foreach (var dir in Environment.GetEnvironmentVariable("PATH").Split(Path.PathSeparator))
{
var candidate = Path.Combine(dir, "docker" + _exeSuffix);
if (File.Exists(candidate))
{
return candidate;
}
}
return null;
}
public void Start(ILogger logger)
{
logger.LogInformation("Starting docker container");
// stop container if there is one, could be from a previous test run, ignore failures
RunProcessAndWait(_path, $"stop {_dockerMonitorContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var _);
RunProcessAndWait(_path, $"stop {_dockerContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var output);
// create and run docker container, remove automatically when stopped, map 6379 from the container to 6379 localhost
// use static name 'redisTestContainer' so if the container doesn't get removed we don't keep adding more
// use redis base docker image
// 20 second timeout to allow redis image to be downloaded, should be a rare occurrence, only happening when a new version is released
RunProcessAndThrowIfFailed(_path, $"run --rm -p 6379:6379 --name {_dockerContainerName} -d redis", "redis", logger, TimeSpan.FromSeconds(20));
// inspect the redis docker image and extract the IPAddress. Necessary when running tests from inside a docker container, spinning up a new docker container for redis
// outside the current container requires linking the networks (difficult to automate) or using the IP:Port combo
RunProcessAndWait(_path, "inspect --format=\"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}\" " + _dockerContainerName, "docker ipaddress", logger, TimeSpan.FromSeconds(5), out output);
output = output.Trim().Replace(Environment.NewLine, "");
// variable used by Startup.cs
Environment.SetEnvironmentVariable("REDIS_CONNECTION", $"{output}:6379");
var (monitorProcess, monitorOutput) = RunProcess(_path, $"run -i --name {_dockerMonitorContainerName} --link {_dockerContainerName}:redis --rm redis redis-cli -h redis -p 6379", "redis monitor", logger);
monitorProcess.StandardInput.WriteLine("MONITOR");
monitorProcess.StandardInput.Flush();
}
public void Stop(ILogger logger)
{
// Get logs from Redis container before stopping the container
RunProcessAndThrowIfFailed(_path, $"logs {_dockerContainerName}", "docker logs", logger, TimeSpan.FromSeconds(5));
logger.LogInformation("Stopping docker container");
RunProcessAndWait(_path, $"stop {_dockerMonitorContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var _);
RunProcessAndWait(_path, $"stop {_dockerContainerName}", "docker stop", logger, TimeSpan.FromSeconds(15), out var _);
}
public int RunCommand(string commandAndArguments, string prefix, out string output) =>
RunCommand(commandAndArguments, prefix, NullLogger.Instance, out output);
public int RunCommand(string commandAndArguments, string prefix, ILogger logger, out string output)
{
return RunProcessAndWait(_path, commandAndArguments, prefix, logger, TimeSpan.FromSeconds(5), out output);
}
private static void RunProcessAndThrowIfFailed(string fileName, string arguments, string prefix, ILogger logger, TimeSpan timeout)
{
var exitCode = RunProcessAndWait(fileName, arguments, prefix, logger, timeout, out var output);
if (exitCode != 0)
{
throw new Exception($"Command '{fileName} {arguments}' failed with exit code '{exitCode}'. Output:{Environment.NewLine}{output}");
}
}
private static int RunProcessAndWait(string fileName, string arguments, string prefix, ILogger logger, TimeSpan timeout, out string output)
{
var (process, lines) = RunProcess(fileName, arguments, prefix, logger);
if (!process.WaitForExit((int)timeout.TotalMilliseconds))
{
process.Close();
logger.LogError("Closing process '{processName}' because it is running longer than the configured timeout.", fileName);
}
// Need to WaitForExit without a timeout to guarantee the output stream has written everything
process.WaitForExit();
output = string.Join(Environment.NewLine, lines);
return process.ExitCode;
}
private static (Process, ConcurrentQueue<string>) RunProcess(string fileName, string arguments, string prefix, ILogger logger)
{
var process = new Process
{
StartInfo = new ProcessStartInfo
{
FileName = fileName,
Arguments = arguments,
UseShellExecute = false,
RedirectStandardError = true,
RedirectStandardOutput = true,
RedirectStandardInput = true
},
EnableRaisingEvents = true
};
var exitCode = 0;
var lines = new ConcurrentQueue<string>();
process.Exited += (_, __) => exitCode = process.ExitCode;
process.OutputDataReceived += (_, a) =>
{
LogIfNotNull(logger.LogInformation, $"'{prefix}' stdout: {{0}}", a.Data);
lines.Enqueue(a.Data);
};
process.ErrorDataReceived += (_, a) =>
{
LogIfNotNull(logger.LogError, $"'{prefix}' stderr: {{0}}", a.Data);
lines.Enqueue(a.Data);
};
process.Start();
process.BeginErrorReadLine();
process.BeginOutputReadLine();
return (process, lines);
}
private static void LogIfNotNull(Action<string, object[]> logger, string message, string data)
{
if (!string.IsNullOrEmpty(data))
{
logger(message, new[] { data });
}
}
}
}

View File

@ -0,0 +1,31 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
public class EchoHub : Hub
{
public string Echo(string message)
{
return message;
}
public Task EchoGroup(string groupName, string message)
{
return Clients.Group(groupName).SendAsync("Echo", message);
}
public Task EchoUser(string userName, string message)
{
return Clients.User(userName).SendAsync("Echo", message);
}
public Task AddSelfToGroup(string groupName)
{
return Groups.AddToGroupAsync(Context.ConnectionId, groupName);
}
}
}

View File

@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(StandardTestTfms)</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<Content Include="..\xunit.runner.json" Link="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.StackExchangeRedis\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR\Microsoft.AspNetCore.SignalR.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />
<ProjectReference Include="..\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Specification.Tests\Microsoft.AspNetCore.SignalR.Specification.Tests.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftExtensionsDependencyInjectionPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="$(MicrosoftExtensionsLoggingPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Testing" Version="$(MicrosoftExtensionsLoggingTestingPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,41 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
public class RedisDependencyInjectionExtensionsTests
{
// No need to go too deep with these tests, or we're just testing StackExchange.Redis again :). It's the one doing the parsing.
[Theory]
[InlineData("testredis.example.com", "testredis.example.com", 0, null, false)]
[InlineData("testredis.example.com:6380,ssl=True", "testredis.example.com", 6380, null, true)]
[InlineData("testredis.example.com:6380,password=hunter2,ssl=True", "testredis.example.com", 6380, "hunter2", true)]
public void AddRedisWithConnectionStringProperlyParsesOptions(string connectionString, string host, int port, string password, bool useSsl)
{
var services = new ServiceCollection();
services.AddSignalR().AddStackExchangeRedis(connectionString);
var provider = services.BuildServiceProvider();
var options = provider.GetService<IOptions<RedisOptions>>();
Assert.NotNull(options.Value);
Assert.NotNull(options.Value.Configuration);
Assert.Equal(password, options.Value.Configuration.Password);
Assert.Collection(options.Value.Configuration.EndPoints,
endpoint =>
{
var dnsEndpoint = Assert.IsType<DnsEndPoint>(endpoint);
Assert.Equal(host, dnsEndpoint.Host);
Assert.Equal(port, dnsEndpoint.Port);
});
Assert.Equal(useSsl, options.Value.Configuration.Ssl);
}
}
}

View File

@ -0,0 +1,198 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
// Disable running server tests in parallel so server logs can accurately be captured per test
[CollectionDefinition(Name, DisableParallelization = true)]
public class RedisEndToEndTestsCollection : ICollectionFixture<RedisServerFixture<Startup>>
{
public const string Name = nameof(RedisEndToEndTestsCollection);
}
[Collection(RedisEndToEndTestsCollection.Name)]
public class RedisEndToEndTests : VerifiableLoggedTest
{
private readonly RedisServerFixture<Startup> _serverFixture;
public RedisEndToEndTests(RedisServerFixture<Startup> serverFixture, ITestOutputHelper output) : base(output)
{
if (serverFixture == null)
{
throw new ArgumentNullException(nameof(serverFixture));
}
_serverFixture = serverFixture;
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task HubConnectionCanSendAndReceiveMessages(HttpTransportType transportType, string protocolName)
{
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(HubConnectionCanSendAndReceiveMessages)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory);
await connection.StartAsync().OrTimeout();
var str = await connection.InvokeAsync<string>("Echo", "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", str);
await connection.DisposeAsync().OrTimeout();
}
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task HubConnectionCanSendAndReceiveGroupMessages(HttpTransportType transportType, string protocolName)
{
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(HubConnectionCanSendAndReceiveGroupMessages)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory);
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, loggerFactory);
var tcs = new TaskCompletionSource<string>();
connection.On<string>("Echo", message => tcs.TrySetResult(message));
var tcs2 = new TaskCompletionSource<string>();
secondConnection.On<string>("Echo", message => tcs2.TrySetResult(message));
var groupName = $"TestGroup_{transportType}_{protocolName}_{Guid.NewGuid()}";
await secondConnection.StartAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
await connection.InvokeAsync("AddSelfToGroup", groupName).OrTimeout();
await secondConnection.InvokeAsync("AddSelfToGroup", groupName).OrTimeout();
await connection.InvokeAsync("EchoGroup", groupName, "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", await tcs.Task.OrTimeout());
Assert.Equal("Hello, World!", await tcs2.Task.OrTimeout());
await connection.DisposeAsync().OrTimeout();
}
}
[ConditionalTheory(Skip= "https://github.com/aspnet/SignalR/issues/3058")]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task CanSendAndReceiveUserMessagesFromMultipleConnectionsWithSameUser(HttpTransportType transportType, string protocolName)
{
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(CanSendAndReceiveUserMessagesFromMultipleConnectionsWithSameUser)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var tcs = new TaskCompletionSource<string>();
connection.On<string>("Echo", message => tcs.TrySetResult(message));
var tcs2 = new TaskCompletionSource<string>();
secondConnection.On<string>("Echo", message => tcs2.TrySetResult(message));
await secondConnection.StartAsync().OrTimeout();
await connection.StartAsync().OrTimeout();
await connection.InvokeAsync("EchoUser", "userA", "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", await tcs.Task.OrTimeout());
Assert.Equal("Hello, World!", await tcs2.Task.OrTimeout());
await connection.DisposeAsync().OrTimeout();
await secondConnection.DisposeAsync().OrTimeout();
}
}
[ConditionalTheory]
[SkipIfDockerNotPresent]
[MemberData(nameof(TransportTypesAndProtocolTypes))]
public async Task CanSendAndReceiveUserMessagesWhenOneConnectionWithUserDisconnects(HttpTransportType transportType, string protocolName)
{
// Regression test:
// When multiple connections from the same user were connected and one left, it used to unsubscribe from the user channel
// Now we keep track of users connections and only unsubscribe when no users are listening
using (StartVerifiableLog(out var loggerFactory, testName:
$"{nameof(CanSendAndReceiveUserMessagesWhenOneConnectionWithUserDisconnects)}_{transportType.ToString()}_{protocolName}"))
{
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
var firstConnection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, loggerFactory, userName: "userA");
var tcs = new TaskCompletionSource<string>();
firstConnection.On<string>("Echo", message => tcs.TrySetResult(message));
await secondConnection.StartAsync().OrTimeout();
await firstConnection.StartAsync().OrTimeout();
await secondConnection.DisposeAsync().OrTimeout();
await firstConnection.InvokeAsync("EchoUser", "userA", "Hello, World!").OrTimeout();
Assert.Equal("Hello, World!", await tcs.Task.OrTimeout());
await firstConnection.DisposeAsync().OrTimeout();
}
}
private static HubConnection CreateConnection(string url, HttpTransportType transportType, IHubProtocol protocol, ILoggerFactory loggerFactory, string userName = null)
{
var hubConnectionBuilder = new HubConnectionBuilder()
.WithLoggerFactory(loggerFactory)
.WithUrl(url, transportType, httpConnectionOptions =>
{
if (!string.IsNullOrEmpty(userName))
{
httpConnectionOptions.Headers["UserName"] = userName;
}
});
hubConnectionBuilder.Services.AddSingleton(protocol);
return hubConnectionBuilder.Build();
}
private static IEnumerable<HttpTransportType> TransportTypes()
{
if (TestHelpers.IsWebSocketsSupported())
{
yield return HttpTransportType.WebSockets;
}
yield return HttpTransportType.ServerSentEvents;
yield return HttpTransportType.LongPolling;
}
public static IEnumerable<object[]> TransportTypesAndProtocolTypes
{
get
{
foreach (var transport in TransportTypes())
{
yield return new object[] { transport, "json" };
if (transport != HttpTransportType.ServerSentEvents)
{
yield return new object[] { transport, "messagepack" };
}
}
}
}
}
}

View File

@ -0,0 +1,84 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.AspNetCore.SignalR.Specification.Tests;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
// Add ScaleoutHubLifetimeManagerTests<TestRedisServer> back after https://github.com/aspnet/SignalR/issues/3088
public class RedisHubLifetimeManagerTests
{
public class TestObject
{
public string TestProperty { get; set; }
}
private RedisHubLifetimeManager<MyHub> CreateLifetimeManager(TestRedisServer server, MessagePackHubProtocolOptions messagePackOptions = null, JsonHubProtocolOptions jsonOptions = null)
{
var options = new RedisOptions() { ConnectionFactory = async (t) => await Task.FromResult(new TestConnectionMultiplexer(server)) };
messagePackOptions = messagePackOptions ?? new MessagePackHubProtocolOptions();
jsonOptions = jsonOptions ?? new JsonHubProtocolOptions();
return new RedisHubLifetimeManager<MyHub>(
NullLogger<RedisHubLifetimeManager<MyHub>>.Instance,
Options.Create(options),
new DefaultHubProtocolResolver(new IHubProtocol[]
{
new JsonHubProtocol(Options.Create(jsonOptions)),
new MessagePackHubProtocol(Options.Create(messagePackOptions)),
}, NullLogger<DefaultHubProtocolResolver>.Instance));
}
[Fact(Skip = "https://github.com/aspnet/SignalR/issues/3088")]
public async Task CamelCasedJsonIsPreservedAcrossRedisBoundary()
{
var server = new TestRedisServer();
var messagePackOptions = new MessagePackHubProtocolOptions();
var jsonOptions = new JsonHubProtocolOptions();
jsonOptions.PayloadSerializerSettings.ContractResolver = new CamelCasePropertyNamesContractResolver();
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
// The sending manager has serializer settings
var manager1 = CreateLifetimeManager(server, messagePackOptions, jsonOptions);
// The receiving one doesn't matter because of how we serialize!
var manager2 = CreateLifetimeManager(server);
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
var connection2 = HubConnectionContextUtils.Create(client2.Connection);
await manager1.OnConnectedAsync(connection1).OrTimeout();
await manager2.OnConnectedAsync(connection2).OrTimeout();
await manager1.SendAllAsync("Hello", new object[] { new TestObject { TestProperty = "Foo" } });
var message = Assert.IsType<InvocationMessage>(await client2.ReadAsync().OrTimeout());
Assert.Equal("Hello", message.Target);
Assert.Collection(
message.Arguments,
arg0 =>
{
var dict = Assert.IsType<JObject>(arg0);
Assert.Collection(dict.Properties(),
prop =>
{
Assert.Equal("testProperty", prop.Name);
Assert.Equal("Foo", prop.Value.Value<string>());
});
});
}
}
}
}

View File

@ -0,0 +1,202 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal;
using Microsoft.AspNetCore.SignalR.Tests;
using Xunit;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
public class RedisProtocolTests
{
private static Dictionary<string, ProtocolTestData<int>> _ackTestData = new[]
{
CreateTestData("Zero", 0, 0x91, 0x00),
CreateTestData("Fixnum", 42, 0x91, 0x2A),
CreateTestData("Uint8", 180, 0x91, 0xCC, 0xB4),
CreateTestData("Uint16", 384, 0x91, 0xCD, 0x01, 0x80),
CreateTestData("Uint32", 70_000, 0x91, 0xCE, 0x00, 0x01, 0x11, 0x70),
}.ToDictionary(t => t.Name);
public static IEnumerable<object[]> AckTestData = _ackTestData.Keys.Select(k => new object[] { k });
[Theory]
[MemberData(nameof(AckTestData))]
public void ParseAck(string testName)
{
var testData = _ackTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var decoded = protocol.ReadAck(testData.Encoded);
Assert.Equal(testData.Decoded, decoded);
}
[Theory]
[MemberData(nameof(AckTestData))]
public void WriteAck(string testName)
{
var testData = _ackTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var encoded = protocol.WriteAck(testData.Decoded);
Assert.Equal(testData.Encoded, encoded);
}
private static Dictionary<string, ProtocolTestData<RedisGroupCommand>> _groupCommandTestData = new[]
{
CreateTestData("GroupAdd", new RedisGroupCommand(42, "S", GroupAction.Add, "G", "C" ), 0x95, 0x2A, 0xA1, (byte)'S', 0x01, 0xA1, (byte)'G', 0xA1, (byte)'C'),
CreateTestData("GroupRemove", new RedisGroupCommand(42, "S", GroupAction.Remove, "G", "C" ), 0x95, 0x2A, 0xA1, (byte)'S', 0x02, 0xA1, (byte)'G', 0xA1, (byte)'C'),
}.ToDictionary(t => t.Name);
public static IEnumerable<object[]> GroupCommandTestData = _groupCommandTestData.Keys.Select(k => new object[] { k });
[Theory]
[MemberData(nameof(GroupCommandTestData))]
public void ParseGroupCommand(string testName)
{
var testData = _groupCommandTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var decoded = protocol.ReadGroupCommand(testData.Encoded);
Assert.Equal(testData.Decoded.Id, decoded.Id);
Assert.Equal(testData.Decoded.ServerName, decoded.ServerName);
Assert.Equal(testData.Decoded.Action, decoded.Action);
Assert.Equal(testData.Decoded.GroupName, decoded.GroupName);
Assert.Equal(testData.Decoded.ConnectionId, decoded.ConnectionId);
}
[Theory]
[MemberData(nameof(GroupCommandTestData))]
public void WriteGroupCommand(string testName)
{
var testData = _groupCommandTestData[testName];
var protocol = new RedisProtocol(Array.Empty<IHubProtocol>());
var encoded = protocol.WriteGroupCommand(testData.Decoded);
Assert.Equal(testData.Encoded, encoded);
}
// The actual invocation message doesn't matter
private static InvocationMessage _testMessage = new InvocationMessage("target", Array.Empty<object>());
// We use a func so we are guaranteed to get a new SerializedHubMessage for each test
private static Dictionary<string, ProtocolTestData<Func<RedisInvocation>>> _invocationTestData = new[]
{
CreateTestData<Func<RedisInvocation>>(
"NoExcludedIds",
() => new RedisInvocation(new SerializedHubMessage(_testMessage), null),
0x92,
0x90,
0x82,
0xA2, (byte)'p', (byte)'1',
0xC4, 0x01, 0x2A,
0xA2, (byte)'p', (byte)'2',
0xC4, 0x01, 0x2A),
CreateTestData<Func<RedisInvocation>>(
"OneExcludedId",
() => new RedisInvocation(new SerializedHubMessage(_testMessage), new [] { "a" }),
0x92,
0x91,
0xA1, (byte)'a',
0x82,
0xA2, (byte)'p', (byte)'1',
0xC4, 0x01, 0x2A,
0xA2, (byte)'p', (byte)'2',
0xC4, 0x01, 0x2A),
CreateTestData<Func<RedisInvocation>>(
"ManyExcludedIds",
() => new RedisInvocation(new SerializedHubMessage(_testMessage), new [] { "a", "b", "c", "d", "e", "f" }),
0x92,
0x96,
0xA1, (byte)'a',
0xA1, (byte)'b',
0xA1, (byte)'c',
0xA1, (byte)'d',
0xA1, (byte)'e',
0xA1, (byte)'f',
0x82,
0xA2, (byte)'p', (byte)'1',
0xC4, 0x01, 0x2A,
0xA2, (byte)'p', (byte)'2',
0xC4, 0x01, 0x2A),
}.ToDictionary(t => t.Name);
public static IEnumerable<object[]> InvocationTestData = _invocationTestData.Keys.Select(k => new object[] { k });
[Theory]
[MemberData(nameof(InvocationTestData))]
public void ParseInvocation(string testName)
{
var testData = _invocationTestData[testName];
var hubProtocols = new[] { new DummyHubProtocol("p1"), new DummyHubProtocol("p2") };
var protocol = new RedisProtocol(hubProtocols);
var expected = testData.Decoded();
var decoded = protocol.ReadInvocation(testData.Encoded);
Assert.Equal(expected.ExcludedConnectionIds, decoded.ExcludedConnectionIds);
// Verify the deserialized object has the necessary serialized forms
foreach (var hubProtocol in hubProtocols)
{
Assert.Equal(
expected.Message.GetSerializedMessage(hubProtocol).ToArray(),
decoded.Message.GetSerializedMessage(hubProtocol).ToArray());
var writtenMessages = hubProtocol.GetWrittenMessages();
Assert.Collection(writtenMessages,
actualMessage =>
{
var invocation = Assert.IsType<InvocationMessage>(actualMessage);
Assert.Same(_testMessage.Target, invocation.Target);
Assert.Same(_testMessage.Arguments, invocation.Arguments);
});
}
}
[Theory]
[MemberData(nameof(InvocationTestData))]
public void WriteInvocation(string testName)
{
var testData = _invocationTestData[testName];
var protocol = new RedisProtocol(new[] { new DummyHubProtocol("p1"), new DummyHubProtocol("p2") });
// Actual invocation doesn't matter because we're using a dummy hub protocol.
// But the dummy protocol will check that we gave it the test message to make sure everything flows through properly.
var expected = testData.Decoded();
var encoded = protocol.WriteInvocation(_testMessage.Target, _testMessage.Arguments, expected.ExcludedConnectionIds);
Assert.Equal(testData.Encoded, encoded);
}
// Create ProtocolTestData<T> using the Power of Type Inference(TM).
private static ProtocolTestData<T> CreateTestData<T>(string name, T decoded, params byte[] encoded)
=> new ProtocolTestData<T>(name, decoded, encoded);
public class ProtocolTestData<T>
{
public string Name { get; }
public T Decoded { get; }
public byte[] Encoded { get; }
public ProtocolTestData(string name, T decoded, byte[] encoded)
{
Name = name;
Decoded = decoded;
Encoded = encoded;
}
}
}
}

View File

@ -0,0 +1,64 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
public class RedisServerFixture<TStartup> : IDisposable
where TStartup : class
{
public ServerFixture<TStartup> FirstServer { get; private set; }
public ServerFixture<TStartup> SecondServer { get; private set; }
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly IDisposable _logToken;
public RedisServerFixture()
{
// Docker is not available on the machine, tests using this fixture
// should be using SkipIfDockerNotPresentAttribute and will be skipped.
if (Docker.Default == null)
{
return;
}
var testLog = AssemblyTestLog.ForAssembly(typeof(RedisServerFixture<TStartup>).Assembly);
_logToken = testLog.StartTestLog(null, $"{nameof(RedisServerFixture<TStartup>)}_{typeof(TStartup).Name}", out _loggerFactory, LogLevel.Trace, "RedisServerFixture");
_logger = _loggerFactory.CreateLogger<RedisServerFixture<TStartup>>();
Docker.Default.Start(_logger);
FirstServer = StartServer();
SecondServer = StartServer();
}
private ServerFixture<TStartup> StartServer()
{
try
{
return new ServerFixture<TStartup>(_loggerFactory);
}
catch (Exception ex)
{
_logger.LogError(ex, "Server failed to start.");
throw;
}
}
public void Dispose()
{
if (Docker.Default != null)
{
FirstServer.Dispose();
SecondServer.Dispose();
Docker.Default.Stop(_logger);
_logToken.Dispose();
}
}
}
}

View File

@ -0,0 +1,39 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.Testing.xunit;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class SkipIfDockerNotPresentAttribute : Attribute, ITestCondition
{
public bool IsMet => CheckDocker();
public string SkipReason { get; private set; } = "Docker is not available";
private bool CheckDocker()
{
if (Docker.Default != null)
{
// Docker is present, but is it working?
if (Docker.Default.RunCommand("ps", "docker ps", out var output) != 0)
{
SkipReason = $"Failed to invoke test command 'docker ps'. Output: {output}";
}
else
{
// We have a docker
return true;
}
}
else
{
SkipReason = "Docker is not installed on the host machine.";
}
// If we get here, we don't have a docker
return false;
}
}
}

View File

@ -0,0 +1,51 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Primitives;
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR(options =>
{
options.EnableDetailedErrors = true;
})
.AddMessagePackProtocol()
.AddStackExchangeRedis(options =>
{
// We start the servers before starting redis so we want to time them out ASAP
options.Configuration.ConnectTimeout = 1;
options.Configuration.EndPoints.Add(Environment.GetEnvironmentVariable("REDIS_CONNECTION"));
});
services.AddSingleton<IUserIdProvider, UserNameIdProvider>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseSignalR(options => options.MapHub<EchoHub>("/echo"));
}
private class UserNameIdProvider : IUserIdProvider
{
public string GetUserId(HubConnectionContext connection)
{
// This is an AWFUL way to authenticate users! We're just using it for test purposes.
var userNameHeader = connection.GetHttpContext().Request.Headers["UserName"];
if (!StringValues.IsNullOrEmpty(userNameHeader))
{
return userNameHeader;
}
return null;
}
}
}
}

View File

@ -0,0 +1,376 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using StackExchange.Redis;
using StackExchange.Redis.Profiling;
namespace Microsoft.AspNetCore.SignalR.Tests
{
public class TestConnectionMultiplexer : IConnectionMultiplexer
{
public string ClientName => throw new NotImplementedException();
public string Configuration => throw new NotImplementedException();
public int TimeoutMilliseconds => throw new NotImplementedException();
public long OperationCount => throw new NotImplementedException();
public bool PreserveAsyncOrder { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public bool IsConnected => true;
public bool IncludeDetailInExceptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public int StormLogThreshold { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public bool IsConnecting => throw new NotImplementedException();
public event EventHandler<RedisErrorEventArgs> ErrorMessage
{
add { }
remove { }
}
public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed
{
add { }
remove { }
}
public event EventHandler<InternalErrorEventArgs> InternalError
{
add { }
remove { }
}
public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored
{
add { }
remove { }
}
public event EventHandler<EndPointEventArgs> ConfigurationChanged
{
add { }
remove { }
}
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast
{
add { }
remove { }
}
public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
{
add { }
remove { }
}
private readonly ISubscriber _subscriber;
public TestConnectionMultiplexer(TestRedisServer server)
{
_subscriber = new TestSubscriber(server);
}
public void BeginProfiling(object forContext)
{
throw new NotImplementedException();
}
public void Close(bool allowCommandsToComplete = true)
{
throw new NotImplementedException();
}
public Task CloseAsync(bool allowCommandsToComplete = true)
{
throw new NotImplementedException();
}
public bool Configure(TextWriter log = null)
{
throw new NotImplementedException();
}
public Task<bool> ConfigureAsync(TextWriter log = null)
{
throw new NotImplementedException();
}
public void Dispose()
{
throw new NotImplementedException();
}
public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
{
throw new NotImplementedException();
}
public ServerCounters GetCounters()
{
throw new NotImplementedException();
}
public IDatabase GetDatabase(int db = -1, object asyncState = null)
{
throw new NotImplementedException();
}
public EndPoint[] GetEndPoints(bool configuredOnly = false)
{
throw new NotImplementedException();
}
public IServer GetServer(string host, int port, object asyncState = null)
{
throw new NotImplementedException();
}
public IServer GetServer(string hostAndPort, object asyncState = null)
{
throw new NotImplementedException();
}
public IServer GetServer(IPAddress host, int port)
{
throw new NotImplementedException();
}
public IServer GetServer(EndPoint endpoint, object asyncState = null)
{
throw new NotImplementedException();
}
public string GetStatus()
{
throw new NotImplementedException();
}
public void GetStatus(TextWriter log)
{
throw new NotImplementedException();
}
public string GetStormLog()
{
throw new NotImplementedException();
}
public ISubscriber GetSubscriber(object asyncState = null)
{
return _subscriber;
}
public int HashSlot(RedisKey key)
{
throw new NotImplementedException();
}
public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public void ResetStormLog()
{
throw new NotImplementedException();
}
public void Wait(Task task)
{
throw new NotImplementedException();
}
public T Wait<T>(Task<T> task)
{
throw new NotImplementedException();
}
public void WaitAll(params Task[] tasks)
{
throw new NotImplementedException();
}
public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider)
{
throw new NotImplementedException();
}
public int GetHashSlot(RedisKey key)
{
throw new NotImplementedException();
}
public void ExportConfiguration(Stream destination, ExportOptions options = (ExportOptions)(-1))
{
throw new NotImplementedException();
}
}
public class TestRedisServer
{
private readonly ConcurrentDictionary<RedisChannel, List<Action<RedisChannel, RedisValue>>> _subscriptions =
new ConcurrentDictionary<RedisChannel, List<Action<RedisChannel, RedisValue>>>();
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (_subscriptions.TryGetValue(channel, out var handlers))
{
foreach (var handler in handlers)
{
handler(channel, message);
}
}
return handlers != null ? handlers.Count : 0;
}
public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
_subscriptions.AddOrUpdate(channel, _ => new List<Action<RedisChannel, RedisValue>> { handler }, (_, list) =>
{
list.Add(handler);
return list;
});
}
public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
if (_subscriptions.TryGetValue(channel, out var list))
{
list.Remove(handler);
}
}
}
public class TestSubscriber : ISubscriber
{
private readonly TestRedisServer _server;
public ConnectionMultiplexer Multiplexer => throw new NotImplementedException();
IConnectionMultiplexer IRedisAsync.Multiplexer => throw new NotImplementedException();
public TestSubscriber(TestRedisServer server)
{
_server = server;
}
public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public bool IsConnected(RedisChannel channel = default)
{
throw new NotImplementedException();
}
public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
return _server.Publish(channel, message, flags);
}
public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
await Task.Yield();
return Publish(channel, message, flags);
}
public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
_server.Subscribe(channel, handler, flags);
}
public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
{
Subscribe(channel, handler, flags);
return Task.CompletedTask;
}
public EndPoint SubscribedEndpoint(RedisChannel channel)
{
throw new NotImplementedException();
}
public bool TryWait(Task task)
{
throw new NotImplementedException();
}
public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
_server.Unsubscribe(channel, handler, flags);
}
public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
{
Unsubscribe(channel, handler, flags);
return Task.CompletedTask;
}
public void Wait(Task task)
{
throw new NotImplementedException();
}
public T Wait<T>(Task<T> task)
{
throw new NotImplementedException();
}
public void WaitAll(params Task[] tasks)
{
throw new NotImplementedException();
}
public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
throw new NotImplementedException();
}
public Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
{
var t = Subscribe(channel, flags);
return Task.FromResult(t);
}
}
}

View File

@ -22,7 +22,6 @@
<PackageReference Include="Microsoft.Extensions.ValueStopwatch.Sources" Version="$(MicrosoftExtensionsValueStopwatchSourcesPackageVersion)" PrivateAssets="All" />
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="$(MicrosoftAspNetCoreHostingPackageVersion)" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="$(MicrosoftAspNetCoreServerKestrelPackageVersion)" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="$(StackExchangeRedisStrongNamePackageVersion)" />
</ItemGroup>
<ItemGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<VersionPrefix>3.0.0</VersionPrefix>
<VersionSuffix>alpha1</VersionSuffix>
<JavaVersionPrefix>0.1.0</JavaVersionPrefix>
<JavaVersionPrefix>1.0.0</JavaVersionPrefix>
<PackageVersion Condition="'$(IsFinalBuild)' == 'true' AND '$(VersionSuffix)' == 'rtm' ">$(VersionPrefix)</PackageVersion>
<PackageVersion Condition="'$(IsFinalBuild)' == 'true' AND '$(VersionSuffix)' != 'rtm' ">$(VersionPrefix)-$(VersionSuffix)-final</PackageVersion>
<BuildNumber Condition="'$(BuildNumber)' == ''">t000</BuildNumber>