Merge branch 'release/2.2'

This commit is contained in:
MikaelMengistu 2018-10-01 16:19:06 -07:00
commit c6a1c7463a
25 changed files with 767 additions and 297 deletions

View File

@ -68,7 +68,7 @@ namespace Microsoft.AspNetCore.SignalR.Crankier
await client.CreateAndStartConnectionAsync(targetAddress, transportType);
}
Log("Connections connected succesfully");
Log("Connections connected successfully");
}
public Task StartTestAsync(TimeSpan sendInterval, int sendBytes)
@ -80,7 +80,7 @@ namespace Microsoft.AspNetCore.SignalR.Crankier
client.StartTest(sendBytes, sendInterval);
}
Log("Test started succesfully");
Log("Test started successfully");
return Task.CompletedTask;
}
@ -98,7 +98,7 @@ namespace Microsoft.AspNetCore.SignalR.Crankier
}
_sendStatusCts.Cancel();
Log("Connections stopped succesfully");
Log("Connections stopped successfully");
_targetConnectionCount = 0;
return Task.CompletedTask;

View File

@ -0,0 +1,126 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Cookie;
import okhttp3.CookieJar;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
class DefaultHttpClient extends HttpClient {
private OkHttpClient client;
private Logger logger;
public DefaultHttpClient(Logger logger) {
this.logger = logger;
this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() {
private List<Cookie> cookieList = new ArrayList<>();
private Lock cookieLock = new ReentrantLock();
@Override
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
cookieLock.lock();
try {
for (Cookie cookie : cookies) {
boolean replacedCookie = false;
for (int i = 0; i < cookieList.size(); i++) {
Cookie innerCookie = cookieList.get(i);
if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) {
// We have a new cookie that matches an older one so we replace the older one.
cookieList.set(i, innerCookie);
replacedCookie = true;
break;
}
}
if (!replacedCookie) {
cookieList.add(cookie);
}
}
} finally {
cookieLock.unlock();
}
}
@Override
public List<Cookie> loadForRequest(HttpUrl url) {
cookieLock.lock();
try {
List<Cookie> matchedCookies = new ArrayList<>();
List<Cookie> expiredCookies = new ArrayList<>();
for (Cookie cookie : cookieList) {
if (cookie.expiresAt() < System.currentTimeMillis()) {
expiredCookies.add(cookie);
} else if (cookie.matches(url)) {
matchedCookies.add(cookie);
}
}
cookieList.removeAll(expiredCookies);
return matchedCookies;
} finally {
cookieLock.unlock();
}
}
}).build();
}
@Override
public CompletableFuture<HttpResponse> send(HttpRequest httpRequest) {
Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl());
if (httpRequest.getMethod() == "GET") {
requestBuilder.get();
} else if (httpRequest.getMethod() == "POST") {
RequestBody body = RequestBody.create(null, new byte[] {});
requestBuilder.post(body);
} else if (httpRequest.getMethod() == "DELETE") {
requestBuilder.delete();
}
if (httpRequest.getHeaders() != null) {
httpRequest.getHeaders().forEach((key, value) -> {
requestBuilder.addHeader(key, value);
});
}
Request request = requestBuilder.build();
CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
responseFuture.completeExceptionally(e.getCause());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
try (ResponseBody body = response.body()) {
HttpResponse httpResponse = new HttpResponse(response.code(), response.message(), body.string());
responseFuture.complete(httpResponse);
}
}
});
return responseFuture;
}
@Override
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
return new OkHttpWebSocketWrapper(url, headers, client, logger);
}
}

View File

@ -6,7 +6,7 @@ package com.microsoft.aspnet.signalr;
import com.google.gson.Gson;
class HandshakeProtocol {
public static Gson gson = new Gson();
private static Gson gson = new Gson();
private static final String RECORD_SEPARATOR = "\u001e";
public static String createHandshakeRequestMessage(HandshakeRequestMessage message) {

View File

@ -0,0 +1,122 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
class HttpRequest {
private String method;
private String url;
private Map<String, String> headers = new HashMap<>();
public void setMethod(String method) {
this.method = method;
}
public void setUrl(String url) {
this.url = url;
}
public void setHeader(String key, String value) {
this.headers.put(key, value);
}
public void setHeaders(Map<String, String> headers) {
headers.forEach((key, value) -> {
this.headers.put(key, value);
});
}
public String getMethod() {
return method;
}
public String getUrl() {
return url;
}
public Map<String, String> getHeaders() {
return headers;
}
}
class HttpResponse {
private int statusCode;
private String statusText;
private String content = null;
public HttpResponse(int statusCode) {
this.statusCode = statusCode;
}
public HttpResponse(int statusCode, String statusText) {
this.statusCode = statusCode;
this.statusText = statusText;
}
public HttpResponse(int statusCode, String statusText, String content) {
this.statusCode = statusCode;
this.statusText = statusText;
this.content = content;
}
public String getContent() {
return content;
}
public int getStatusCode() {
return statusCode;
}
public String getStatusText() {
return statusText;
}
}
abstract class HttpClient {
public CompletableFuture<HttpResponse> get(String url) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("GET");
return this.send(request);
}
public CompletableFuture<HttpResponse> get(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("GET");
return this.send(options);
}
public CompletableFuture<HttpResponse> post(String url) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("POST");
return this.send(request);
}
public CompletableFuture<HttpResponse> post(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("POST");
return this.send(options);
}
public CompletableFuture<HttpResponse> delete(String url) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("DELETE");
return this.send(request);
}
public CompletableFuture<HttpResponse> delete(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("DELETE");
return this.send(options);
}
public abstract CompletableFuture<HttpResponse> send(HttpRequest request);
public abstract WebSocketWrapper createWebSocket(String url, Map<String, String> headers);
}

View File

@ -4,18 +4,18 @@
package com.microsoft.aspnet.signalr;
import java.io.IOException;
import java.util.*;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import okhttp3.Cookie;
import okhttp3.CookieJar;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
public class HubConnection {
private String url;
private Transport transport;
@ -29,16 +29,15 @@ public class HubConnection {
private Logger logger;
private List<Consumer<Exception>> onClosedCallbackList;
private boolean skipNegotiate = false;
private NegotiateResponse negotiateResponse;
private String accessToken;
private Map<String, String> headers = new HashMap<>();
private ConnectionState connectionState = null;
private OkHttpClient httpClient;
private HttpClient httpClient;
private static ArrayList<Class<?>> emptyArray = new ArrayList<>();
private static int MAX_NEGOTIATE_ATTEMPTS = 100;
public HubConnection(String url, Transport transport, Logger logger, boolean skipNegotiate) {
public HubConnection(String url, Transport transport, Logger logger, boolean skipNegotiate, HttpClient client) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
}
@ -52,64 +51,18 @@ public class HubConnection {
this.logger = new NullLogger();
}
if (client != null) {
this.httpClient = client;
} else {
this.httpClient = new DefaultHttpClient(this.logger);
}
if (transport != null) {
this.transport = transport;
}
this.skipNegotiate = skipNegotiate;
this.httpClient = new OkHttpClient.Builder()
.cookieJar(new CookieJar() {
private List<Cookie> cookieList = new ArrayList<>();
private Lock cookieLock = new ReentrantLock();
@Override
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
cookieLock.lock();
try {
for (Cookie cookie : cookies) {
boolean replacedCookie = false;
for (int i = 0; i < cookieList.size(); i++) {
Cookie innerCookie = cookieList.get(i);
if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) {
// We have a new cookie that matches an older one so we replace the older one.
cookieList.set(i, innerCookie);
replacedCookie = true;
break;
}
}
if (!replacedCookie) {
cookieList.add(cookie);
}
}
} finally {
cookieLock.unlock();
}
}
@Override
public List<Cookie> loadForRequest(HttpUrl url) {
cookieLock.lock();
try {
List<Cookie> matchedCookies = new ArrayList<>();
List<Cookie> expiredCookies = new ArrayList<>();
for (Cookie cookie : cookieList) {
if (cookie.expiresAt() < System.currentTimeMillis()) {
expiredCookies.add(cookie);
} else if (cookie.matches(url)) {
matchedCookies.add(cookie);
}
}
cookieList.removeAll(expiredCookies);
return matchedCookies;
} finally {
cookieLock.unlock();
}
}
})
.build();
this.callback = (payload) -> {
if (!handshakeReceived) {
@ -174,30 +127,40 @@ public class HubConnection {
};
}
private NegotiateResponse handleNegotiate() throws IOException, HubException {
accessToken = (negotiateResponse == null) ? null : negotiateResponse.getAccessToken();
negotiateResponse = Negotiate.processNegotiate(url, httpClient, accessToken);
private CompletableFuture<NegotiateResponse> handleNegotiate() throws IOException, InterruptedException, ExecutionException {
HttpRequest request = new HttpRequest();
request.setHeaders(this.headers);
if (negotiateResponse.getError() != null) {
throw new HubException(negotiateResponse.getError());
}
if (negotiateResponse.getConnectionId() != null) {
if (url.contains("?")) {
url = url + "&id=" + negotiateResponse.getConnectionId();
} else {
url = url + "?id=" + negotiateResponse.getConnectionId();
return httpClient.post(Negotiate.resolveNegotiateUrl(url), request).thenCompose((response) -> {
NegotiateResponse negotiateResponse;
try {
negotiateResponse = new NegotiateResponse(response.getContent());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (negotiateResponse.getAccessToken() != null) {
this.headers.put("Authorization", "Bearer " + negotiateResponse.getAccessToken());
}
if (negotiateResponse.getError() != null) {
throw new RuntimeException(negotiateResponse.getError());
}
if (negotiateResponse.getRedirectUrl() != null) {
this.url = this.negotiateResponse.getRedirectUrl();
}
if (negotiateResponse.getConnectionId() != null) {
if (url.contains("?")) {
url = url + "&id=" + negotiateResponse.getConnectionId();
} else {
url = url + "?id=" + negotiateResponse.getConnectionId();
}
}
return negotiateResponse;
if (negotiateResponse.getAccessToken() != null) {
this.headers.put("Authorization", "Bearer " + negotiateResponse.getAccessToken());
}
if (negotiateResponse.getRedirectUrl() != null) {
this.url = negotiateResponse.getRedirectUrl();
}
return CompletableFuture.completedFuture(negotiateResponse);
});
}
/**
@ -214,53 +177,96 @@ public class HubConnection {
*
* @throws Exception An error occurred while connecting.
*/
public CompletableFuture start() throws Exception {
public CompletableFuture<Void> start() throws Exception {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<NegotiateResponse> negotiate = null;
if (!skipNegotiate) {
int negotiateAttempts = 0;
do {
accessToken = (negotiateResponse == null) ? null : negotiateResponse.getAccessToken();
negotiateResponse = handleNegotiate();
negotiateAttempts++;
} while (negotiateResponse.getRedirectUrl() != null && negotiateAttempts < MAX_NEGOTIATE_ATTEMPTS);
if (!negotiateResponse.getAvailableTransports().contains("WebSockets")) {
throw new HubException("There were no compatible transports on the server.");
negotiate = startNegotiate(0);
} else {
negotiate = CompletableFuture.completedFuture(null);
}
return negotiate.thenCompose((response) -> {
// If we didn't skip negotiate and got a null response then exit start because we
// are probably disconnected
if (response == null && !skipNegotiate) {
return CompletableFuture.completedFuture(null);
}
}
logger.log(LogLevel.Debug, "Starting HubConnection");
if (transport == null) {
transport = new WebSocketTransport(url, logger, headers, httpClient);
}
transport.setOnReceive(this.callback);
return transport.start().thenCompose((future) -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenRun(() -> {
hubConnectionStateLock.lock();
logger.log(LogLevel.Debug, "Starting HubConnection");
if (transport == null) {
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnected started.");
} finally {
hubConnectionStateLock.unlock();
transport = new WebSocketTransport(url, headers, httpClient, logger);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
});
});
}
transport.setOnReceive(this.callback);
try {
return transport.start().thenCompose((future) -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenRun(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
} finally {
hubConnectionStateLock.unlock();
}
});
});
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private CompletableFuture<NegotiateResponse> startNegotiate(int negotiateAttempts) throws IOException, InterruptedException, ExecutionException {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
return CompletableFuture.completedFuture(null);
}
return handleNegotiate().thenCompose((response) -> {
if (response.getRedirectUrl() != null && negotiateAttempts >= MAX_NEGOTIATE_ATTEMPTS) {
throw new RuntimeException("Negotiate redirection limit exceeded.");
}
if (response.getRedirectUrl() == null) {
if (!response.getAvailableTransports().contains("WebSockets")) {
try {
throw new HubException("There were no compatible transports on the server.");
} catch (HubException e) {
throw new RuntimeException(e);
}
}
return CompletableFuture.completedFuture(response);
}
try {
return startNegotiate(negotiateAttempts + 1);
} catch (IOException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
/**
* Stops a connection to the server.
*/
private void stop(String errorMessage) {
HubException hubException = null;
private CompletableFuture<Void> stop(String errorMessage) {
hubConnectionStateLock.lock();
try {
if (hubConnectionState == HubConnectionState.DISCONNECTED) {
return;
return CompletableFuture.completedFuture(null);
}
if (errorMessage != null) {
@ -268,32 +274,42 @@ public class HubConnection {
} else {
logger.log(LogLevel.Debug, "Stopping HubConnection.");
}
transport.stop();
hubConnectionState = HubConnectionState.DISCONNECTED;
if (errorMessage != null) {
hubException = new HubException(errorMessage);
}
connectionState.cancelOutstandingInvocations(hubException);
connectionState = null;
logger.log(LogLevel.Information, "HubConnection stopped.");
} finally {
hubConnectionStateLock.unlock();
}
if (onClosedCallbackList != null) {
for (Consumer<Exception> callback : onClosedCallbackList) {
callback.accept(hubException);
return transport.stop().whenComplete((i, t) -> {
HubException hubException = null;
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.DISCONNECTED;
if (errorMessage != null) {
hubException = new HubException(errorMessage);
} else if (t != null) {
hubException = new HubException(t.getMessage());
}
connectionState.cancelOutstandingInvocations(hubException);
connectionState = null;
logger.log(LogLevel.Information, "HubConnection stopped.");
} finally {
hubConnectionStateLock.unlock();
}
}
// Do not run these callbacks inside the hubConnectionStateLock
if (onClosedCallbackList != null) {
for (Consumer<Exception> callback : onClosedCallbackList) {
callback.accept(hubException);
}
}
});
}
/**
* Stops a connection to the server.
*/
public void stop() {
stop(null);
public CompletableFuture<Void> stop() {
return stop(null);
}
/**

View File

@ -8,6 +8,7 @@ public class HubConnectionBuilder {
private Transport transport;
private Logger logger;
private boolean skipNegotiate;
private HttpClient client;
public HubConnectionBuilder withUrl(String url) {
if (url == null || url.isEmpty()) {
@ -49,10 +50,16 @@ public class HubConnectionBuilder {
return this;
}
// For testing purposes only
HubConnectionBuilder configureHttpClient(HttpClient client) {
this.client = client;
return this;
}
public HubConnection build() {
if (this.url == null) {
throw new RuntimeException("The 'HubConnectionBuilder.withUrl' method must be called before building the connection.");
}
return new HubConnection(url, transport, logger, skipNegotiate);
return new HubConnection(url, transport, logger, skipNegotiate, client);
}
}

View File

@ -3,37 +3,7 @@
package com.microsoft.aspnet.signalr;
import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
class Negotiate {
public static NegotiateResponse processNegotiate(String url, OkHttpClient httpClient) throws IOException {
return processNegotiate(url, httpClient, null);
}
public static NegotiateResponse processNegotiate(String url, OkHttpClient httpClient,String accessTokenHeader) throws IOException {
url = resolveNegotiateUrl(url);
RequestBody body = RequestBody.create(null, new byte[]{});
Request.Builder requestBuilder = new Request.Builder()
.url(url)
.post(body);
if (accessTokenHeader != null) {
requestBuilder.addHeader("Authorization", "Bearer " + accessTokenHeader);
}
Request request = requestBuilder.build();
Response response = httpClient.newCall(request).execute();
String result = response.body().string();
return new NegotiateResponse(result);
}
public static String resolveNegotiateUrl(String url) {
String negotiateUrl = "";

View File

@ -0,0 +1,113 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
class OkHttpWebSocketWrapper extends WebSocketWrapper {
private WebSocket websocketClient;
private String url;
private Map<String, String> headers;
private OkHttpClient client;
private Logger logger;
private OnReceiveCallBack onReceive;
private BiConsumer<Integer, String> onClose;
private CompletableFuture<Void> startFuture = new CompletableFuture<>();
private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
public OkHttpWebSocketWrapper(String url, Map<String, String> headers, OkHttpClient client, Logger logger) {
this.url = url;
this.headers = headers;
this.client = client;
this.logger = logger;
}
@Override
public CompletableFuture<Void> start() {
Headers.Builder headerBuilder = new Headers.Builder();
for (String key : headers.keySet()) {
headerBuilder.add(key, headers.get(key));
}
Request request = new Request.Builder()
.url(url.toString())
.headers(headerBuilder.build())
.build();
this.websocketClient = client.newWebSocket(request, new SignalRWebSocketListener());
return startFuture;
}
@Override
public CompletableFuture<Void> stop() {
websocketClient.close(1000, "HubConnection stopped.");
return closeFuture;
}
@Override
public CompletableFuture<Void> send(String message) {
websocketClient.send(message);
return CompletableFuture.completedFuture(null);
}
@Override
public void setOnReceive(OnReceiveCallBack onReceive) {
this.onReceive = onReceive;
}
@Override
public void setOnClose(BiConsumer<Integer, String> onClose) {
this.onClose = onClose;
}
private class SignalRWebSocketListener extends WebSocketListener {
@Override
public void onOpen(WebSocket webSocket, Response response) {
startFuture.complete(null);
}
@Override
public void onMessage(WebSocket webSocket, String message) {
try {
onReceive.invoke(message);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
onClose.accept(code, reason);
closeFuture.complete(null);
checkStartFailure();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.log(LogLevel.Error, "Error: %s.", t.getMessage());
closeFuture.completeExceptionally(new RuntimeException());
checkStartFailure();
}
private void checkStartFailure() {
// If the start future hasn't completed yet, then we need to complete it
// exceptionally.
if (!startFuture.isDone()) {
String errorMessage = "There was an error starting the Websockets transport.";
logger.log(LogLevel.Debug, errorMessage);
startFuture.completeExceptionally(new RuntimeException(errorMessage));
}
}
}
}

View File

@ -6,9 +6,9 @@ package com.microsoft.aspnet.signalr;
import java.util.concurrent.CompletableFuture;
interface Transport {
CompletableFuture start() throws Exception;
CompletableFuture send(String message);
CompletableFuture<Void> start() throws Exception;
CompletableFuture<Void> send(String message);
void setOnReceive(OnReceiveCallBack callback);
void onReceive(String message) throws Exception;
CompletableFuture stop();
CompletableFuture<Void> stop();
}

View File

@ -3,67 +3,56 @@
package com.microsoft.aspnet.signalr;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import okhttp3.*;
class WebSocketTransport implements Transport {
private WebSocket websocketClient;
private SignalRWebSocketListener webSocketListener;
private WebSocketWrapper webSocketClient;
private OnReceiveCallBack onReceiveCallBack;
private URI url;
private String url;
private Logger logger;
private HttpClient client;
private Map<String, String> headers;
private OkHttpClient httpClient;
private CompletableFuture<Void> startFuture = new CompletableFuture<>();
private static final String HTTP = "http";
private static final String HTTPS = "https";
private static final String WS = "ws";
private static final String WSS = "wss";
public WebSocketTransport(String url, Logger logger, Map<String, String> headers) throws URISyntaxException {
public WebSocketTransport(String url, Map<String, String> headers, HttpClient client, Logger logger) throws URISyntaxException {
this.url = formatUrl(url);
this.logger = logger;
this.client = client;
this.headers = headers;
this.httpClient = new OkHttpClient();
}
public WebSocketTransport(String url, Logger logger, Map<String, String> headers, OkHttpClient httpClient) throws URISyntaxException {
this.url = formatUrl(url);
this.logger = logger;
this.headers = headers;
this.httpClient = httpClient;
}
public URI getUrl() {
String getUrl() {
return url;
}
private URI formatUrl(String url) throws URISyntaxException {
private String formatUrl(String url) throws URISyntaxException {
if (url.startsWith(HTTPS)) {
url = WSS + url.substring(HTTPS.length());
} else if (url.startsWith(HTTP)) {
url = WS + url.substring(HTTP.length());
}
return new URI(url);
return url;
}
@Override
public CompletableFuture start() {
logger.log(LogLevel.Debug, "Starting Websocket connection.");
webSocketListener = new SignalRWebSocketListener();
websocketClient = createUpdatedWebSocket(webSocketListener);
return startFuture;
public CompletableFuture<Void> start() {
logger.log(LogLevel.Debug, "Starting Websocket connection.");
this.webSocketClient = client.createWebSocket(this.url.toString(), this.headers);
this.webSocketClient.setOnReceive((message) -> onReceive(message));
this.webSocketClient.setOnClose((code, reason) -> onClose(code, reason));
return webSocketClient.start().thenRun(() -> logger.log(LogLevel.Information, "WebSocket transport connected to: %s.", this.url));
}
@Override
public CompletableFuture send(String message) {
return CompletableFuture.runAsync(() -> websocketClient.send(message));
public CompletableFuture<Void> send(String message) {
return webSocketClient.send(message);
}
@Override
@ -78,64 +67,12 @@ class WebSocketTransport implements Transport {
}
@Override
public CompletableFuture stop() {
return CompletableFuture.runAsync(() -> {
websocketClient.close(1000, "HubConnection stopped.");
logger.log(LogLevel.Information, "WebSocket connection stopped");
});
public CompletableFuture<Void> stop() {
return webSocketClient.stop().whenComplete((i, j) -> logger.log(LogLevel.Information, "WebSocket connection stopped."));
}
private WebSocket createUpdatedWebSocket(WebSocketListener webSocketListener) {
Headers.Builder headerBuilder = new Headers.Builder();
for (String key: headers.keySet()) {
headerBuilder.add(key, headers.get(key));
}
Request request = new Request.Builder().url(url.toString())
.headers(headerBuilder.build())
.build();
return this.httpClient.newWebSocket(request, webSocketListener);
}
private class SignalRWebSocketListener extends WebSocketListener {
@Override
public void onOpen(WebSocket webSocket, Response response) {
startFuture.complete(null);
logger.log(LogLevel.Information, "WebSocket transport connected to: %s", websocketClient.request().url());
}
@Override
public void onMessage(WebSocket webSocket, String message) {
try {
onReceive(message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
logger.log(LogLevel.Information, "WebSocket connection stopping with " +
"code %d and reason %s", code, reason);
// If the start future hasn't completed yet, then we need to complete it exceptionally.
checkStartFailure();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.log(LogLevel.Error, "Error : %d", t.getMessage());
// If the start future hasn't completed yet, then we need to complete it exceptionally.
checkStartFailure();
}
}
private void checkStartFailure() {
// If the start future hasn't completed yet, then we need to complete it exceptionally.
if (!startFuture.isDone()) {
String errorMessage = "There was an error starting the Websockets transport.";
logger.log(LogLevel.Debug, errorMessage);
startFuture.completeExceptionally(new RuntimeException(errorMessage));
}
void onClose(int code, String reason) {
logger.log(LogLevel.Information, "WebSocket connection stopping with " +
"code %d and reason '%s'.", code, reason);
}
}

View File

@ -0,0 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
abstract class WebSocketWrapper {
public abstract CompletableFuture<Void> start();
public abstract CompletableFuture<Void> stop();
public abstract CompletableFuture<Void> send(String message);
public abstract void setOnReceive(OnReceiveCallBack onReceive);
public abstract void setOnClose(BiConsumer<Integer, String> onClose);
}

View File

@ -5,9 +5,12 @@ package com.microsoft.aspnet.signalr;
import static org.junit.jupiter.api.Assertions.*;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
@ -19,7 +22,7 @@ class HubConnectionTest {
@Test
public void checkHubConnectionState() throws Exception {
Transport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(),true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -45,7 +48,7 @@ class HubConnectionTest {
@Test
public void hubConnectionClosesAfterCloseMessage() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
@ -60,7 +63,7 @@ class HubConnectionTest {
@Test
public void hubConnectionReceiveHandshakeResponseWithError() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
Throwable exception = assertThrows(HubException.class, () -> mockTransport.receiveMessage("{\"error\":\"Requested protocol 'messagepack' is not available.\"}" + RECORD_SEPARATOR));
@ -71,7 +74,7 @@ class HubConnectionTest {
public void registeringMultipleHandlersAndBothGetTriggered() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
hubConnection.on("inc", action);
@ -97,7 +100,7 @@ class HubConnectionTest {
public void removeHandlerByName() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
hubConnection.on("inc", action);
@ -124,7 +127,7 @@ class HubConnectionTest {
public void addAndRemoveHandlerImmediately() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
hubConnection.on("inc", action);
@ -149,7 +152,7 @@ class HubConnectionTest {
public void removingMultipleHandlersWithOneCallToRemove() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
Action secondAction = () -> value.getAndUpdate((val) -> val + 2);
@ -181,7 +184,7 @@ class HubConnectionTest {
public void removeHandlerWithUnsubscribe() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
Subscription subscription = hubConnection.on("inc", action);
@ -214,7 +217,7 @@ class HubConnectionTest {
public void unsubscribeTwice() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(),true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(),true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
Subscription subscription = hubConnection.on("inc", action);
@ -248,7 +251,7 @@ class HubConnectionTest {
public void removeSingleHandlerWithUnsubscribe() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
Action secondAction = () -> value.getAndUpdate((val) -> val + 2);
@ -278,7 +281,7 @@ class HubConnectionTest {
public void addAndRemoveHandlerImmediatelyWithSubscribe() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action action = () -> value.getAndUpdate((val) -> val + 1);
Subscription sub = hubConnection.on("inc", action);
@ -303,7 +306,7 @@ class HubConnectionTest {
public void registeringMultipleHandlersThatTakeParamsAndBothGetTriggered() throws Exception {
AtomicReference<Double> value = new AtomicReference<>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
Action1<Double> action = (number) -> value.getAndUpdate((val) -> val + number);
@ -322,7 +325,7 @@ class HubConnectionTest {
@Test
public void invokeWaitsForCompletionMessage() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
@ -339,7 +342,7 @@ class HubConnectionTest {
@Test
public void multipleInvokesWaitForOwnCompletionMessage() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
@ -362,7 +365,7 @@ class HubConnectionTest {
@Test
public void invokeWorksForPrimitiveTypes() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
@ -380,7 +383,7 @@ class HubConnectionTest {
@Test
public void completionMessageCanHaveError() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
@ -404,7 +407,7 @@ class HubConnectionTest {
@Test
public void stopCancelsActiveInvokes() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
@ -429,7 +432,7 @@ class HubConnectionTest {
public void sendWithNoParamsTriggersOnHandler() throws Exception {
AtomicReference<Integer> value = new AtomicReference<>(0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", () ->{
assertEquals(Integer.valueOf(0), value.get());
@ -448,7 +451,7 @@ class HubConnectionTest {
public void sendWithParamTriggersOnHandler() throws Exception {
AtomicReference<String> value = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param) ->{
assertNull(value.get());
@ -470,7 +473,7 @@ class HubConnectionTest {
AtomicReference<Double> value2 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2) ->{
assertNull(value1.get());
@ -497,7 +500,7 @@ class HubConnectionTest {
AtomicReference<String> value3 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2, param3) ->{
assertNull(value1.get());
@ -528,7 +531,7 @@ class HubConnectionTest {
AtomicReference<String> value4 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2, param3, param4) ->{
assertNull(value1.get());
@ -562,7 +565,7 @@ class HubConnectionTest {
AtomicReference<Double> value5 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2, param3, param4, param5) ->{
assertNull(value1.get());
@ -600,7 +603,7 @@ class HubConnectionTest {
AtomicReference<String> value6 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2, param3, param4, param5, param6) -> {
assertNull(value1.get());
@ -642,7 +645,7 @@ class HubConnectionTest {
AtomicReference<String> value7 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2, param3, param4, param5, param6, param7) -> {
assertNull(value1.get());
@ -688,7 +691,7 @@ class HubConnectionTest {
AtomicReference<String> value8 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1, param2, param3, param4, param5, param6, param7, param8) -> {
assertNull(value1.get());
@ -735,7 +738,7 @@ class HubConnectionTest {
AtomicReference<Custom> value1 = new AtomicReference<>();
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", (param1) -> {
assertNull(value1.get());
@ -760,7 +763,7 @@ class HubConnectionTest {
public void receiveHandshakeResponseAndMessage() throws Exception {
AtomicReference<Double> value = new AtomicReference<Double>(0.0);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("inc", () ->{
assertEquals(Double.valueOf(0), value.get());
@ -782,7 +785,7 @@ class HubConnectionTest {
public void onClosedCallbackRunsWhenStopIsCalled() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
Transport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
hubConnection.onClosed((ex) -> {
assertNull(value1.get());
@ -799,7 +802,7 @@ class HubConnectionTest {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Transport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
hubConnection.onClosed((ex) -> {
@ -824,7 +827,7 @@ class HubConnectionTest {
@Test
public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithError() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.onClosed((ex) -> {
assertEquals(ex.getMessage(), "There was an error");
});
@ -841,7 +844,7 @@ class HubConnectionTest {
@Test
public void callingStartOnStartedHubConnectionNoOps() throws Exception {
Transport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger() ,true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.start();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -855,7 +858,7 @@ class HubConnectionTest {
@Test
public void cannotSendBeforeStart() throws Exception {
Transport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
Throwable exception = assertThrows(HubException.class, () -> hubConnection.send("inc"));
@ -865,7 +868,7 @@ class HubConnectionTest {
@Test
public void errorWhenReceivingInvokeWithIncorrectArgumentLength() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true);
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient());
hubConnection.on("Send", (s) -> {
assertTrue(false);
}, String.class);
@ -877,4 +880,90 @@ class HubConnectionTest {
RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR));
assertEquals("Invocation provides 0 argument(s) but target expects 1.", exception.getMessage());
}
@Test
public void negotiateSentOnStart() {
TestHttpClient client = new TestHttpClient()
.on("POST", (req) -> CompletableFuture.completedFuture(new HttpResponse(404, "", "")));
HubConnection hubConnection = new HubConnectionBuilder()
.withUrl("http://example.com")
.configureHttpClient(client)
.build();
try {
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
} catch(Exception ex) {}
List<HttpRequest> sentRequests = client.getSentRequests();
assertEquals(1, sentRequests.size());
assertEquals("http://example.com/negotiate", sentRequests.get(0).getUrl());
}
@Test
public void negotiateThatRedirectsForeverFailsAfter100Tries() throws InterruptedException, TimeoutException, Exception {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}")));
HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com")
.configureHttpClient(client).build();
ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS));
assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getMessage());
}
@Test
public void afterSuccessfulNegotiateConnectsWithTransport() throws InterruptedException, TimeoutException, Exception {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "",
"{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
MockTransport transport = new MockTransport();
HttpConnectionOptions options = new HttpConnectionOptions();
options.setTransport(transport);
HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com", options)
.configureHttpClient(client).build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
String[] sentMessages = transport.getSentMessages();
assertEquals(1, sentMessages.length);
assertEquals("{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR, sentMessages[0]);
}
@Test
public void negotiateThatReturnsErrorThrowsFromStart() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"error\":\"Test error.\"}")));
MockTransport transport = new MockTransport();
HttpConnectionOptions options = new HttpConnectionOptions();
options.setTransport(transport);
HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com", options)
.configureHttpClient(client).build();
ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS));
assertEquals("Test error.", exception.getCause().getMessage());
}
@Test
public void negotiateRedirectIsFollowed()
throws InterruptedException, ExecutionException, TimeoutException, Exception {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
.on("POST", "http://testexample.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
MockTransport transport = new MockTransport();
HttpConnectionOptions options = new HttpConnectionOptions();
options.setTransport(transport);
HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com", options)
.configureHttpClient(client).build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
}
}

View File

@ -0,0 +1,70 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
class TestHttpClient extends HttpClient {
private Function<HttpRequest, CompletableFuture<HttpResponse>> handler;
private List<HttpRequest> sentRequests;
public TestHttpClient() {
this.sentRequests = new ArrayList<>();
this.handler = (req) -> {
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException(String.format("Request has no handler: %s %s", req.getMethod(), req.getUrl())));
return future;
};
}
@Override
public CompletableFuture<HttpResponse> send(HttpRequest request) {
this.sentRequests.add(request);
return this.handler.apply(request);
}
public List<HttpRequest> getSentRequests() {
return sentRequests;
}
public TestHttpClient on(Function<HttpRequest, CompletableFuture<HttpResponse>> handler) {
this.handler = (req) -> handler.apply(req);
return this;
}
public TestHttpClient on(String method, Function<HttpRequest, CompletableFuture<HttpResponse>> handler) {
Function<HttpRequest, CompletableFuture<HttpResponse>> oldHandler = this.handler;
this.handler = (req) -> {
if (req.getMethod().equals(method)) {
return handler.apply(req);
}
return oldHandler.apply(req);
};
return this;
}
public TestHttpClient on(String method, String url, Function<HttpRequest, CompletableFuture<HttpResponse>> handler) {
Function<HttpRequest, CompletableFuture<HttpResponse>> oldHandler = this.handler;
this.handler = (req) -> {
if (req.getMethod().equals(method) && req.getUrl().equals(url)) {
return handler.apply(req);
}
return oldHandler.apply(req);
};
return this;
}
@Override
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
throw new RuntimeException("WebSockets isn't supported in testing currently.");
}
}

View File

@ -6,6 +6,7 @@ package com.microsoft.aspnet.signalr;
import static org.junit.jupiter.api.Assertions.*;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
@ -13,8 +14,8 @@ import org.junit.jupiter.api.Test;
class WebSocketTransportTest {
@Test
public void WebsocketThrowsIfItCantConnect() throws Exception {
Transport transport = new WebSocketTransport("http://www.notarealurl12345.fake", new NullLogger(), new HashMap<>());
Throwable exception = assertThrows(Exception.class, () -> transport.start().get(1,TimeUnit.SECONDS));
Transport transport = new WebSocketTransport("http://www.notarealurl12345.fake", new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start().get(1, TimeUnit.SECONDS));
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());
}
}

View File

@ -25,7 +25,7 @@ class WebSocketTransportUrlFormatTest {
@ParameterizedTest
@MethodSource("protocols")
public void checkWebsocketUrlProtocol(String url, String expectedUrl) throws URISyntaxException {
WebSocketTransport webSocketTransport = new WebSocketTransport(url, new NullLogger(), new HashMap<>());
assertEquals(expectedUrl, webSocketTransport.getUrl().toString());
WebSocketTransport webSocketTransport = new WebSocketTransport(url, new HashMap<>(), new TestHttpClient(), new NullLogger());
assertEquals(expectedUrl, webSocketTransport.getUrl());
}
}

View File

@ -27,7 +27,7 @@ namespace Microsoft.AspNetCore.Internal
// remaining bits (%x0000000) are the lowest bits of the value. The most significant bit of the second
// byte is 0 meaning this is last byte of the VarInt. The actual value bits (%x0101001) need to be
// prepended to the bits we already read so the values is %01010010000000 i.e. 0x1480 (5248)
// We support paylads up to 2GB so the biggest number we support is 7fffffff which when encoded as
// We support payloads up to 2GB so the biggest number we support is 7fffffff which when encoded as
// VarInt is 0xFF 0xFF 0xFF 0xFF 0x07 - hence the maximum length prefix is 5 bytes.
var length = 0U;

View File

@ -155,7 +155,7 @@ namespace Microsoft.AspNetCore.Internal
public static bool ReadForType(JsonTextReader reader, Type type)
{
// Explicity read values as dates from JSON with reader.
// Explicitly read values as dates from JSON with reader.
// We do this because otherwise dates are read as strings
// and the JsonSerializer will use a conversion method that won't
// preserve UTC in DateTime.Kind for UTC ISO8601 dates

View File

@ -34,7 +34,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal
reader = new Utf8BufferTextReader();
}
// Taken off the the thread static
// Taken off the thread static
_cachedInstance = null;
#if DEBUG
if (reader._inUse)

View File

@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.Internal
writer = new Utf8BufferTextWriter();
}
// Taken off the the thread static
// Taken off the thread static
_cachedInstance = null;
#if DEBUG
if (writer._inUse)

View File

@ -529,7 +529,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client
httpMessageHandler = new AccessTokenHttpMessageHandler(httpMessageHandler, this);
}
// Wrap message handler after HttpMessageHandlerFactory to ensure not overriden
// Wrap message handler after HttpMessageHandlerFactory to ensure not overridden
httpMessageHandler = new LoggingHttpMessageHandler(httpMessageHandler, _loggerFactory);
var httpClient = new HttpClient(httpMessageHandler);

View File

@ -299,7 +299,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
// If the status code is a 204 it means the connection is done
if (context.Response.StatusCode == StatusCodes.Status204NoContent)
{
// Cancel current request to release any waiting poll and let dispose aquire the lock
// Cancel current request to release any waiting poll and let dispose acquire the lock
currentRequestTcs.TrySetCanceled();
// We should be able to safely dispose because there's no more data being written
@ -312,7 +312,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
}
else if (resultTask.IsFaulted)
{
// Cancel current request to release any waiting poll and let dispose aquire the lock
// Cancel current request to release any waiting poll and let dispose acquire the lock
currentRequestTcs.TrySetCanceled();
// transport task was faulted, we should remove the connection

View File

@ -801,7 +801,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
// Adjust consumed and examined to point to the end of the handshake
// response, this handles the case where invocations are sent in the same payload
// as the the negotiate response.
// as the negotiate response.
consumed = buffer.Start;
examined = consumed;

View File

@ -12,7 +12,7 @@ namespace Microsoft.AspNetCore.SignalR
/// <summary>
/// Gets the user ID for the specified connection.
/// </summary>
/// <param name="connection">The connection get get the user ID for.</param>
/// <param name="connection">The connection to get the user ID for.</param>
/// <returns>The user ID for the specified connection.</returns>
string GetUserId(HubConnectionContext connection);
}

View File

@ -328,7 +328,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await sseTransport.Output.WriteAsync(new byte[] { 0x42 });
// For for send request to be in progress
// For send request to be in progress
await sendSyncPoint.WaitForSyncPoint();
var stopTask = sseTransport.StopAsync();

View File

@ -81,7 +81,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
// create and run docker container, remove automatically when stopped, map 6379 from the container to 6379 localhost
// use static name 'redisTestContainer' so if the container doesn't get removed we don't keep adding more
// use redis base docker image
// 20 second timeout to allow redis image to be downloaded, should be a rare occurance, only happening when a new version is released
// 20 second timeout to allow redis image to be downloaded, should be a rare occurrence, only happening when a new version is released
RunProcessAndThrowIfFailed(_path, $"run --rm -p 6379:6379 --name {_dockerContainerName} -d redis", "redis", logger, TimeSpan.FromSeconds(20));
// inspect the redis docker image and extract the IPAddress. Necessary when running tests from inside a docker container, spinning up a new docker container for redis