diff --git a/benchmarkapps/Crankier/Worker.cs b/benchmarkapps/Crankier/Worker.cs index ed41709659..6a39616574 100644 --- a/benchmarkapps/Crankier/Worker.cs +++ b/benchmarkapps/Crankier/Worker.cs @@ -68,7 +68,7 @@ namespace Microsoft.AspNetCore.SignalR.Crankier await client.CreateAndStartConnectionAsync(targetAddress, transportType); } - Log("Connections connected succesfully"); + Log("Connections connected successfully"); } public Task StartTestAsync(TimeSpan sendInterval, int sendBytes) @@ -80,7 +80,7 @@ namespace Microsoft.AspNetCore.SignalR.Crankier client.StartTest(sendBytes, sendInterval); } - Log("Test started succesfully"); + Log("Test started successfully"); return Task.CompletedTask; } @@ -98,7 +98,7 @@ namespace Microsoft.AspNetCore.SignalR.Crankier } _sendStatusCts.Cancel(); - Log("Connections stopped succesfully"); + Log("Connections stopped successfully"); _targetConnectionCount = 0; return Task.CompletedTask; diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/DefaultHttpClient.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/DefaultHttpClient.java new file mode 100644 index 0000000000..05e86f2ec9 --- /dev/null +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/DefaultHttpClient.java @@ -0,0 +1,126 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.aspnet.signalr; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Cookie; +import okhttp3.CookieJar; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; + +class DefaultHttpClient extends HttpClient { + private OkHttpClient client; + private Logger logger; + + public DefaultHttpClient(Logger logger) { + this.logger = logger; + this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() { + private List cookieList = new ArrayList<>(); + private Lock cookieLock = new ReentrantLock(); + + @Override + public void saveFromResponse(HttpUrl url, List cookies) { + cookieLock.lock(); + try { + for (Cookie cookie : cookies) { + boolean replacedCookie = false; + for (int i = 0; i < cookieList.size(); i++) { + Cookie innerCookie = cookieList.get(i); + if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) { + // We have a new cookie that matches an older one so we replace the older one. + cookieList.set(i, innerCookie); + replacedCookie = true; + break; + } + } + if (!replacedCookie) { + cookieList.add(cookie); + } + } + } finally { + cookieLock.unlock(); + } + } + + @Override + public List loadForRequest(HttpUrl url) { + cookieLock.lock(); + try { + List matchedCookies = new ArrayList<>(); + List expiredCookies = new ArrayList<>(); + for (Cookie cookie : cookieList) { + if (cookie.expiresAt() < System.currentTimeMillis()) { + expiredCookies.add(cookie); + } else if (cookie.matches(url)) { + matchedCookies.add(cookie); + } + } + + cookieList.removeAll(expiredCookies); + return matchedCookies; + } finally { + cookieLock.unlock(); + } + } + }).build(); + } + + @Override + public CompletableFuture send(HttpRequest httpRequest) { + Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl()); + if (httpRequest.getMethod() == "GET") { + requestBuilder.get(); + } else if (httpRequest.getMethod() == "POST") { + RequestBody body = RequestBody.create(null, new byte[] {}); + requestBuilder.post(body); + } else if (httpRequest.getMethod() == "DELETE") { + requestBuilder.delete(); + } + + if (httpRequest.getHeaders() != null) { + httpRequest.getHeaders().forEach((key, value) -> { + requestBuilder.addHeader(key, value); + }); + } + + Request request = requestBuilder.build(); + + CompletableFuture responseFuture = new CompletableFuture<>(); + + client.newCall(request).enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + responseFuture.completeExceptionally(e.getCause()); + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + try (ResponseBody body = response.body()) { + HttpResponse httpResponse = new HttpResponse(response.code(), response.message(), body.string()); + responseFuture.complete(httpResponse); + } + } + }); + + return responseFuture; + } + + @Override + public WebSocketWrapper createWebSocket(String url, Map headers) { + return new OkHttpWebSocketWrapper(url, headers, client, logger); + } +} \ No newline at end of file diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HandshakeProtocol.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HandshakeProtocol.java index d35925d608..74f5e15e1f 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HandshakeProtocol.java +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HandshakeProtocol.java @@ -6,7 +6,7 @@ package com.microsoft.aspnet.signalr; import com.google.gson.Gson; class HandshakeProtocol { - public static Gson gson = new Gson(); + private static Gson gson = new Gson(); private static final String RECORD_SEPARATOR = "\u001e"; public static String createHandshakeRequestMessage(HandshakeRequestMessage message) { diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HttpClient.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HttpClient.java new file mode 100644 index 0000000000..c5b71b4f9f --- /dev/null +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HttpClient.java @@ -0,0 +1,122 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.aspnet.signalr; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +class HttpRequest { + private String method; + private String url; + private Map headers = new HashMap<>(); + + public void setMethod(String method) { + this.method = method; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setHeader(String key, String value) { + this.headers.put(key, value); + } + + public void setHeaders(Map headers) { + headers.forEach((key, value) -> { + this.headers.put(key, value); + }); + } + + public String getMethod() { + return method; + } + + public String getUrl() { + return url; + } + + public Map getHeaders() { + return headers; + } +} + +class HttpResponse { + private int statusCode; + private String statusText; + private String content = null; + + public HttpResponse(int statusCode) { + this.statusCode = statusCode; + } + + public HttpResponse(int statusCode, String statusText) { + this.statusCode = statusCode; + this.statusText = statusText; + } + + public HttpResponse(int statusCode, String statusText, String content) { + this.statusCode = statusCode; + this.statusText = statusText; + this.content = content; + } + + public String getContent() { + return content; + } + + public int getStatusCode() { + return statusCode; + } + + public String getStatusText() { + return statusText; + } +} + +abstract class HttpClient { + public CompletableFuture get(String url) { + HttpRequest request = new HttpRequest(); + request.setUrl(url); + request.setMethod("GET"); + return this.send(request); + } + + public CompletableFuture get(String url, HttpRequest options) { + options.setUrl(url); + options.setMethod("GET"); + return this.send(options); + } + + public CompletableFuture post(String url) { + HttpRequest request = new HttpRequest(); + request.setUrl(url); + request.setMethod("POST"); + return this.send(request); + } + + public CompletableFuture post(String url, HttpRequest options) { + options.setUrl(url); + options.setMethod("POST"); + return this.send(options); + } + + public CompletableFuture delete(String url) { + HttpRequest request = new HttpRequest(); + request.setUrl(url); + request.setMethod("DELETE"); + return this.send(request); + } + + public CompletableFuture delete(String url, HttpRequest options) { + options.setUrl(url); + options.setMethod("DELETE"); + return this.send(options); + } + + public abstract CompletableFuture send(HttpRequest request); + + public abstract WebSocketWrapper createWebSocket(String url, Map headers); +} \ No newline at end of file diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnection.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnection.java index b3fc059470..8d84394b32 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnection.java +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnection.java @@ -4,18 +4,18 @@ package com.microsoft.aspnet.signalr; import java.io.IOException; -import java.util.*; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import okhttp3.Cookie; -import okhttp3.CookieJar; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; - public class HubConnection { private String url; private Transport transport; @@ -29,16 +29,15 @@ public class HubConnection { private Logger logger; private List> onClosedCallbackList; private boolean skipNegotiate = false; - private NegotiateResponse negotiateResponse; private String accessToken; private Map headers = new HashMap<>(); private ConnectionState connectionState = null; - private OkHttpClient httpClient; + private HttpClient httpClient; private static ArrayList> emptyArray = new ArrayList<>(); private static int MAX_NEGOTIATE_ATTEMPTS = 100; - public HubConnection(String url, Transport transport, Logger logger, boolean skipNegotiate) { + public HubConnection(String url, Transport transport, Logger logger, boolean skipNegotiate, HttpClient client) { if (url == null || url.isEmpty()) { throw new IllegalArgumentException("A valid url is required."); } @@ -52,64 +51,18 @@ public class HubConnection { this.logger = new NullLogger(); } + if (client != null) { + this.httpClient = client; + } else { + this.httpClient = new DefaultHttpClient(this.logger); + } + if (transport != null) { this.transport = transport; } this.skipNegotiate = skipNegotiate; - this.httpClient = new OkHttpClient.Builder() - .cookieJar(new CookieJar() { - private List cookieList = new ArrayList<>(); - private Lock cookieLock = new ReentrantLock(); - - @Override - public void saveFromResponse(HttpUrl url, List cookies) { - cookieLock.lock(); - try { - for (Cookie cookie : cookies) { - boolean replacedCookie = false; - for (int i = 0; i < cookieList.size(); i++) { - Cookie innerCookie = cookieList.get(i); - if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) { - // We have a new cookie that matches an older one so we replace the older one. - cookieList.set(i, innerCookie); - replacedCookie = true; - break; - } - } - if (!replacedCookie) { - cookieList.add(cookie); - } - } - } finally { - cookieLock.unlock(); - } - } - - @Override - public List loadForRequest(HttpUrl url) { - cookieLock.lock(); - try { - List matchedCookies = new ArrayList<>(); - List expiredCookies = new ArrayList<>(); - for (Cookie cookie : cookieList) { - if (cookie.expiresAt() < System.currentTimeMillis()) { - expiredCookies.add(cookie); - } else if (cookie.matches(url)) { - matchedCookies.add(cookie); - } - } - - cookieList.removeAll(expiredCookies); - return matchedCookies; - } finally { - cookieLock.unlock(); - } - } - }) - .build(); - this.callback = (payload) -> { if (!handshakeReceived) { @@ -174,30 +127,40 @@ public class HubConnection { }; } - private NegotiateResponse handleNegotiate() throws IOException, HubException { - accessToken = (negotiateResponse == null) ? null : negotiateResponse.getAccessToken(); - negotiateResponse = Negotiate.processNegotiate(url, httpClient, accessToken); + private CompletableFuture handleNegotiate() throws IOException, InterruptedException, ExecutionException { + HttpRequest request = new HttpRequest(); + request.setHeaders(this.headers); - if (negotiateResponse.getError() != null) { - throw new HubException(negotiateResponse.getError()); - } - if (negotiateResponse.getConnectionId() != null) { - if (url.contains("?")) { - url = url + "&id=" + negotiateResponse.getConnectionId(); - } else { - url = url + "?id=" + negotiateResponse.getConnectionId(); + return httpClient.post(Negotiate.resolveNegotiateUrl(url), request).thenCompose((response) -> { + NegotiateResponse negotiateResponse; + try { + negotiateResponse = new NegotiateResponse(response.getContent()); + } catch (IOException e) { + throw new RuntimeException(e); } - } - if (negotiateResponse.getAccessToken() != null) { - this.headers.put("Authorization", "Bearer " + negotiateResponse.getAccessToken()); - } + if (negotiateResponse.getError() != null) { + throw new RuntimeException(negotiateResponse.getError()); + } - if (negotiateResponse.getRedirectUrl() != null) { - this.url = this.negotiateResponse.getRedirectUrl(); - } + if (negotiateResponse.getConnectionId() != null) { + if (url.contains("?")) { + url = url + "&id=" + negotiateResponse.getConnectionId(); + } else { + url = url + "?id=" + negotiateResponse.getConnectionId(); + } + } - return negotiateResponse; + if (negotiateResponse.getAccessToken() != null) { + this.headers.put("Authorization", "Bearer " + negotiateResponse.getAccessToken()); + } + + if (negotiateResponse.getRedirectUrl() != null) { + this.url = negotiateResponse.getRedirectUrl(); + } + + return CompletableFuture.completedFuture(negotiateResponse); + }); } /** @@ -214,53 +177,96 @@ public class HubConnection { * * @throws Exception An error occurred while connecting. */ - public CompletableFuture start() throws Exception { + public CompletableFuture start() throws Exception { if (hubConnectionState != HubConnectionState.DISCONNECTED) { return CompletableFuture.completedFuture(null); } + + CompletableFuture negotiate = null; if (!skipNegotiate) { - int negotiateAttempts = 0; - do { - accessToken = (negotiateResponse == null) ? null : negotiateResponse.getAccessToken(); - negotiateResponse = handleNegotiate(); - negotiateAttempts++; - } while (negotiateResponse.getRedirectUrl() != null && negotiateAttempts < MAX_NEGOTIATE_ATTEMPTS); - if (!negotiateResponse.getAvailableTransports().contains("WebSockets")) { - throw new HubException("There were no compatible transports on the server."); + negotiate = startNegotiate(0); + } else { + negotiate = CompletableFuture.completedFuture(null); + } + + return negotiate.thenCompose((response) -> { + // If we didn't skip negotiate and got a null response then exit start because we + // are probably disconnected + if (response == null && !skipNegotiate) { + return CompletableFuture.completedFuture(null); } - } - logger.log(LogLevel.Debug, "Starting HubConnection"); - if (transport == null) { - transport = new WebSocketTransport(url, logger, headers, httpClient); - } - - transport.setOnReceive(this.callback); - return transport.start().thenCompose((future) -> { - String handshake = HandshakeProtocol.createHandshakeRequestMessage(new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); - return transport.send(handshake).thenRun(() -> { - hubConnectionStateLock.lock(); + logger.log(LogLevel.Debug, "Starting HubConnection"); + if (transport == null) { try { - hubConnectionState = HubConnectionState.CONNECTED; - connectionState = new ConnectionState(this); - logger.log(LogLevel.Information, "HubConnected started."); - } finally { - hubConnectionStateLock.unlock(); + transport = new WebSocketTransport(url, headers, httpClient, logger); + } catch (URISyntaxException e) { + throw new RuntimeException(e); } - }); - }); + } + transport.setOnReceive(this.callback); + + try { + return transport.start().thenCompose((future) -> { + String handshake = HandshakeProtocol.createHandshakeRequestMessage( + new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); + return transport.send(handshake).thenRun(() -> { + hubConnectionStateLock.lock(); + try { + hubConnectionState = HubConnectionState.CONNECTED; + connectionState = new ConnectionState(this); + logger.log(LogLevel.Information, "HubConnection started."); + } finally { + hubConnectionStateLock.unlock(); + } + }); + }); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private CompletableFuture startNegotiate(int negotiateAttempts) throws IOException, InterruptedException, ExecutionException { + if (hubConnectionState != HubConnectionState.DISCONNECTED) { + return CompletableFuture.completedFuture(null); + } + + return handleNegotiate().thenCompose((response) -> { + if (response.getRedirectUrl() != null && negotiateAttempts >= MAX_NEGOTIATE_ATTEMPTS) { + throw new RuntimeException("Negotiate redirection limit exceeded."); + } + + if (response.getRedirectUrl() == null) { + if (!response.getAvailableTransports().contains("WebSockets")) { + try { + throw new HubException("There were no compatible transports on the server."); + } catch (HubException e) { + throw new RuntimeException(e); + } + } + return CompletableFuture.completedFuture(response); + } + + try { + return startNegotiate(negotiateAttempts + 1); + } catch (IOException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); } /** * Stops a connection to the server. */ - private void stop(String errorMessage) { - HubException hubException = null; + private CompletableFuture stop(String errorMessage) { hubConnectionStateLock.lock(); try { if (hubConnectionState == HubConnectionState.DISCONNECTED) { - return; + return CompletableFuture.completedFuture(null); } if (errorMessage != null) { @@ -268,32 +274,42 @@ public class HubConnection { } else { logger.log(LogLevel.Debug, "Stopping HubConnection."); } - - transport.stop(); - hubConnectionState = HubConnectionState.DISCONNECTED; - - if (errorMessage != null) { - hubException = new HubException(errorMessage); - } - connectionState.cancelOutstandingInvocations(hubException); - connectionState = null; - logger.log(LogLevel.Information, "HubConnection stopped."); } finally { hubConnectionStateLock.unlock(); } - if (onClosedCallbackList != null) { - for (Consumer callback : onClosedCallbackList) { - callback.accept(hubException); + return transport.stop().whenComplete((i, t) -> { + HubException hubException = null; + hubConnectionStateLock.lock(); + try { + hubConnectionState = HubConnectionState.DISCONNECTED; + + if (errorMessage != null) { + hubException = new HubException(errorMessage); + } else if (t != null) { + hubException = new HubException(t.getMessage()); + } + connectionState.cancelOutstandingInvocations(hubException); + connectionState = null; + logger.log(LogLevel.Information, "HubConnection stopped."); + } finally { + hubConnectionStateLock.unlock(); } - } + + // Do not run these callbacks inside the hubConnectionStateLock + if (onClosedCallbackList != null) { + for (Consumer callback : onClosedCallbackList) { + callback.accept(hubException); + } + } + }); } /** * Stops a connection to the server. */ - public void stop() { - stop(null); + public CompletableFuture stop() { + return stop(null); } /** diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnectionBuilder.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnectionBuilder.java index b677240c1d..e9c4dc94ef 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnectionBuilder.java +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/HubConnectionBuilder.java @@ -8,6 +8,7 @@ public class HubConnectionBuilder { private Transport transport; private Logger logger; private boolean skipNegotiate; + private HttpClient client; public HubConnectionBuilder withUrl(String url) { if (url == null || url.isEmpty()) { @@ -49,10 +50,16 @@ public class HubConnectionBuilder { return this; } + // For testing purposes only + HubConnectionBuilder configureHttpClient(HttpClient client) { + this.client = client; + return this; + } + public HubConnection build() { if (this.url == null) { throw new RuntimeException("The 'HubConnectionBuilder.withUrl' method must be called before building the connection."); } - return new HubConnection(url, transport, logger, skipNegotiate); + return new HubConnection(url, transport, logger, skipNegotiate, client); } } \ No newline at end of file diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Negotiate.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Negotiate.java index a90b71c476..f5f47a794d 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Negotiate.java +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Negotiate.java @@ -3,37 +3,7 @@ package com.microsoft.aspnet.signalr; -import java.io.IOException; - -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; - class Negotiate { - - public static NegotiateResponse processNegotiate(String url, OkHttpClient httpClient) throws IOException { - return processNegotiate(url, httpClient, null); - } - - public static NegotiateResponse processNegotiate(String url, OkHttpClient httpClient,String accessTokenHeader) throws IOException { - url = resolveNegotiateUrl(url); - RequestBody body = RequestBody.create(null, new byte[]{}); - Request.Builder requestBuilder = new Request.Builder() - .url(url) - .post(body); - - if (accessTokenHeader != null) { - requestBuilder.addHeader("Authorization", "Bearer " + accessTokenHeader); - } - - Request request = requestBuilder.build(); - - Response response = httpClient.newCall(request).execute(); - String result = response.body().string(); - return new NegotiateResponse(result); - } - public static String resolveNegotiateUrl(String url) { String negotiateUrl = ""; diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/OkHttpWebSocketWrapper.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/OkHttpWebSocketWrapper.java new file mode 100644 index 0000000000..98ee93aea0 --- /dev/null +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/OkHttpWebSocketWrapper.java @@ -0,0 +1,113 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.aspnet.signalr; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; + +class OkHttpWebSocketWrapper extends WebSocketWrapper { + private WebSocket websocketClient; + private String url; + private Map headers; + private OkHttpClient client; + private Logger logger; + private OnReceiveCallBack onReceive; + private BiConsumer onClose; + private CompletableFuture startFuture = new CompletableFuture<>(); + private CompletableFuture closeFuture = new CompletableFuture<>(); + + public OkHttpWebSocketWrapper(String url, Map headers, OkHttpClient client, Logger logger) { + this.url = url; + this.headers = headers; + this.client = client; + this.logger = logger; + } + + @Override + public CompletableFuture start() { + Headers.Builder headerBuilder = new Headers.Builder(); + for (String key : headers.keySet()) { + headerBuilder.add(key, headers.get(key)); + } + + Request request = new Request.Builder() + .url(url.toString()) + .headers(headerBuilder.build()) + .build(); + + this.websocketClient = client.newWebSocket(request, new SignalRWebSocketListener()); + return startFuture; + } + + @Override + public CompletableFuture stop() { + websocketClient.close(1000, "HubConnection stopped."); + return closeFuture; + } + + @Override + public CompletableFuture send(String message) { + websocketClient.send(message); + return CompletableFuture.completedFuture(null); + } + + @Override + public void setOnReceive(OnReceiveCallBack onReceive) { + this.onReceive = onReceive; + } + + @Override + public void setOnClose(BiConsumer onClose) { + this.onClose = onClose; + } + + private class SignalRWebSocketListener extends WebSocketListener { + @Override + public void onOpen(WebSocket webSocket, Response response) { + startFuture.complete(null); + } + + @Override + public void onMessage(WebSocket webSocket, String message) { + try { + onReceive.invoke(message); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void onClosing(WebSocket webSocket, int code, String reason) { + onClose.accept(code, reason); + closeFuture.complete(null); + checkStartFailure(); + } + + @Override + public void onFailure(WebSocket webSocket, Throwable t, Response response) { + logger.log(LogLevel.Error, "Error: %s.", t.getMessage()); + closeFuture.completeExceptionally(new RuntimeException()); + checkStartFailure(); + } + + private void checkStartFailure() { + // If the start future hasn't completed yet, then we need to complete it + // exceptionally. + if (!startFuture.isDone()) { + String errorMessage = "There was an error starting the Websockets transport."; + logger.log(LogLevel.Debug, errorMessage); + startFuture.completeExceptionally(new RuntimeException(errorMessage)); + } + } + } +} \ No newline at end of file diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Transport.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Transport.java index eafb865465..29efaf4050 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Transport.java +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/Transport.java @@ -6,9 +6,9 @@ package com.microsoft.aspnet.signalr; import java.util.concurrent.CompletableFuture; interface Transport { - CompletableFuture start() throws Exception; - CompletableFuture send(String message); + CompletableFuture start() throws Exception; + CompletableFuture send(String message); void setOnReceive(OnReceiveCallBack callback); void onReceive(String message) throws Exception; - CompletableFuture stop(); + CompletableFuture stop(); } diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketTransport.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketTransport.java index 35270deb04..98c4c7b84d 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketTransport.java +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketTransport.java @@ -3,67 +3,56 @@ package com.microsoft.aspnet.signalr; -import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.CompletableFuture; -import okhttp3.*; - class WebSocketTransport implements Transport { - private WebSocket websocketClient; - private SignalRWebSocketListener webSocketListener; + private WebSocketWrapper webSocketClient; private OnReceiveCallBack onReceiveCallBack; - private URI url; + private String url; private Logger logger; + private HttpClient client; private Map headers; - private OkHttpClient httpClient; - private CompletableFuture startFuture = new CompletableFuture<>(); private static final String HTTP = "http"; private static final String HTTPS = "https"; private static final String WS = "ws"; private static final String WSS = "wss"; - public WebSocketTransport(String url, Logger logger, Map headers) throws URISyntaxException { + public WebSocketTransport(String url, Map headers, HttpClient client, Logger logger) throws URISyntaxException { this.url = formatUrl(url); this.logger = logger; + this.client = client; this.headers = headers; - this.httpClient = new OkHttpClient(); } - public WebSocketTransport(String url, Logger logger, Map headers, OkHttpClient httpClient) throws URISyntaxException { - this.url = formatUrl(url); - this.logger = logger; - this.headers = headers; - this.httpClient = httpClient; - } - - public URI getUrl() { + String getUrl() { return url; } - private URI formatUrl(String url) throws URISyntaxException { + private String formatUrl(String url) throws URISyntaxException { if (url.startsWith(HTTPS)) { url = WSS + url.substring(HTTPS.length()); } else if (url.startsWith(HTTP)) { url = WS + url.substring(HTTP.length()); } - return new URI(url); + return url; } @Override - public CompletableFuture start() { - logger.log(LogLevel.Debug, "Starting Websocket connection."); - webSocketListener = new SignalRWebSocketListener(); - websocketClient = createUpdatedWebSocket(webSocketListener); - return startFuture; + public CompletableFuture start() { + logger.log(LogLevel.Debug, "Starting Websocket connection."); + this.webSocketClient = client.createWebSocket(this.url.toString(), this.headers); + this.webSocketClient.setOnReceive((message) -> onReceive(message)); + this.webSocketClient.setOnClose((code, reason) -> onClose(code, reason)); + return webSocketClient.start().thenRun(() -> logger.log(LogLevel.Information, "WebSocket transport connected to: %s.", this.url)); } @Override - public CompletableFuture send(String message) { - return CompletableFuture.runAsync(() -> websocketClient.send(message)); + public CompletableFuture send(String message) { + return webSocketClient.send(message); } @Override @@ -78,64 +67,12 @@ class WebSocketTransport implements Transport { } @Override - public CompletableFuture stop() { - return CompletableFuture.runAsync(() -> { - websocketClient.close(1000, "HubConnection stopped."); - logger.log(LogLevel.Information, "WebSocket connection stopped"); - }); + public CompletableFuture stop() { + return webSocketClient.stop().whenComplete((i, j) -> logger.log(LogLevel.Information, "WebSocket connection stopped.")); } - private WebSocket createUpdatedWebSocket(WebSocketListener webSocketListener) { - Headers.Builder headerBuilder = new Headers.Builder(); - for (String key: headers.keySet()) { - headerBuilder.add(key, headers.get(key)); - } - Request request = new Request.Builder().url(url.toString()) - .headers(headerBuilder.build()) - .build(); - - return this.httpClient.newWebSocket(request, webSocketListener); - } - - - private class SignalRWebSocketListener extends WebSocketListener { - @Override - public void onOpen(WebSocket webSocket, Response response) { - startFuture.complete(null); - logger.log(LogLevel.Information, "WebSocket transport connected to: %s", websocketClient.request().url()); - } - - @Override - public void onMessage(WebSocket webSocket, String message) { - try { - onReceive(message); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public void onClosing(WebSocket webSocket, int code, String reason) { - logger.log(LogLevel.Information, "WebSocket connection stopping with " + - "code %d and reason %s", code, reason); - // If the start future hasn't completed yet, then we need to complete it exceptionally. - checkStartFailure(); - } - - @Override - public void onFailure(WebSocket webSocket, Throwable t, Response response) { - logger.log(LogLevel.Error, "Error : %d", t.getMessage()); - // If the start future hasn't completed yet, then we need to complete it exceptionally. - checkStartFailure(); - } - } - - private void checkStartFailure() { - // If the start future hasn't completed yet, then we need to complete it exceptionally. - if (!startFuture.isDone()) { - String errorMessage = "There was an error starting the Websockets transport."; - logger.log(LogLevel.Debug, errorMessage); - startFuture.completeExceptionally(new RuntimeException(errorMessage)); - } + void onClose(int code, String reason) { + logger.log(LogLevel.Information, "WebSocket connection stopping with " + + "code %d and reason '%s'.", code, reason); } } diff --git a/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketWrapper.java b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketWrapper.java new file mode 100644 index 0000000000..56c8929d86 --- /dev/null +++ b/clients/java/signalr/src/main/java/com/microsoft/aspnet/signalr/WebSocketWrapper.java @@ -0,0 +1,19 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.aspnet.signalr; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +abstract class WebSocketWrapper { + public abstract CompletableFuture start(); + + public abstract CompletableFuture stop(); + + public abstract CompletableFuture send(String message); + + public abstract void setOnReceive(OnReceiveCallBack onReceive); + + public abstract void setOnClose(BiConsumer onClose); +} \ No newline at end of file diff --git a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/HubConnectionTest.java b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/HubConnectionTest.java index abeff622d1..f2321f6afa 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/HubConnectionTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/HubConnectionTest.java @@ -5,9 +5,12 @@ package com.microsoft.aspnet.signalr; import static org.junit.jupiter.api.Assertions.*; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; @@ -19,7 +22,7 @@ class HubConnectionTest { @Test public void checkHubConnectionState() throws Exception { Transport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(),true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -45,7 +48,7 @@ class HubConnectionTest { @Test public void hubConnectionClosesAfterCloseMessage() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); @@ -60,7 +63,7 @@ class HubConnectionTest { @Test public void hubConnectionReceiveHandshakeResponseWithError() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); Throwable exception = assertThrows(HubException.class, () -> mockTransport.receiveMessage("{\"error\":\"Requested protocol 'messagepack' is not available.\"}" + RECORD_SEPARATOR)); @@ -71,7 +74,7 @@ class HubConnectionTest { public void registeringMultipleHandlersAndBothGetTriggered() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); hubConnection.on("inc", action); @@ -97,7 +100,7 @@ class HubConnectionTest { public void removeHandlerByName() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); hubConnection.on("inc", action); @@ -124,7 +127,7 @@ class HubConnectionTest { public void addAndRemoveHandlerImmediately() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); hubConnection.on("inc", action); @@ -149,7 +152,7 @@ class HubConnectionTest { public void removingMultipleHandlersWithOneCallToRemove() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); Action secondAction = () -> value.getAndUpdate((val) -> val + 2); @@ -181,7 +184,7 @@ class HubConnectionTest { public void removeHandlerWithUnsubscribe() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); Subscription subscription = hubConnection.on("inc", action); @@ -214,7 +217,7 @@ class HubConnectionTest { public void unsubscribeTwice() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(),true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(),true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); Subscription subscription = hubConnection.on("inc", action); @@ -248,7 +251,7 @@ class HubConnectionTest { public void removeSingleHandlerWithUnsubscribe() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); Action secondAction = () -> value.getAndUpdate((val) -> val + 2); @@ -278,7 +281,7 @@ class HubConnectionTest { public void addAndRemoveHandlerImmediatelyWithSubscribe() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action action = () -> value.getAndUpdate((val) -> val + 1); Subscription sub = hubConnection.on("inc", action); @@ -303,7 +306,7 @@ class HubConnectionTest { public void registeringMultipleHandlersThatTakeParamsAndBothGetTriggered() throws Exception { AtomicReference value = new AtomicReference<>(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); Action1 action = (number) -> value.getAndUpdate((val) -> val + number); @@ -322,7 +325,7 @@ class HubConnectionTest { @Test public void invokeWaitsForCompletionMessage() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); @@ -339,7 +342,7 @@ class HubConnectionTest { @Test public void multipleInvokesWaitForOwnCompletionMessage() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); @@ -362,7 +365,7 @@ class HubConnectionTest { @Test public void invokeWorksForPrimitiveTypes() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); @@ -380,7 +383,7 @@ class HubConnectionTest { @Test public void completionMessageCanHaveError() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); @@ -404,7 +407,7 @@ class HubConnectionTest { @Test public void stopCancelsActiveInvokes() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); @@ -429,7 +432,7 @@ class HubConnectionTest { public void sendWithNoParamsTriggersOnHandler() throws Exception { AtomicReference value = new AtomicReference<>(0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", () ->{ assertEquals(Integer.valueOf(0), value.get()); @@ -448,7 +451,7 @@ class HubConnectionTest { public void sendWithParamTriggersOnHandler() throws Exception { AtomicReference value = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param) ->{ assertNull(value.get()); @@ -470,7 +473,7 @@ class HubConnectionTest { AtomicReference value2 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2) ->{ assertNull(value1.get()); @@ -497,7 +500,7 @@ class HubConnectionTest { AtomicReference value3 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2, param3) ->{ assertNull(value1.get()); @@ -528,7 +531,7 @@ class HubConnectionTest { AtomicReference value4 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2, param3, param4) ->{ assertNull(value1.get()); @@ -562,7 +565,7 @@ class HubConnectionTest { AtomicReference value5 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2, param3, param4, param5) ->{ assertNull(value1.get()); @@ -600,7 +603,7 @@ class HubConnectionTest { AtomicReference value6 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2, param3, param4, param5, param6) -> { assertNull(value1.get()); @@ -642,7 +645,7 @@ class HubConnectionTest { AtomicReference value7 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2, param3, param4, param5, param6, param7) -> { assertNull(value1.get()); @@ -688,7 +691,7 @@ class HubConnectionTest { AtomicReference value8 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1, param2, param3, param4, param5, param6, param7, param8) -> { assertNull(value1.get()); @@ -735,7 +738,7 @@ class HubConnectionTest { AtomicReference value1 = new AtomicReference<>(); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", (param1) -> { assertNull(value1.get()); @@ -760,7 +763,7 @@ class HubConnectionTest { public void receiveHandshakeResponseAndMessage() throws Exception { AtomicReference value = new AtomicReference(0.0); MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("inc", () ->{ assertEquals(Double.valueOf(0), value.get()); @@ -782,7 +785,7 @@ class HubConnectionTest { public void onClosedCallbackRunsWhenStopIsCalled() throws Exception { AtomicReference value1 = new AtomicReference<>(); Transport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); hubConnection.onClosed((ex) -> { assertNull(value1.get()); @@ -799,7 +802,7 @@ class HubConnectionTest { AtomicReference value1 = new AtomicReference<>(); AtomicReference value2 = new AtomicReference<>(); Transport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); hubConnection.onClosed((ex) -> { @@ -824,7 +827,7 @@ class HubConnectionTest { @Test public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithError() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.onClosed((ex) -> { assertEquals(ex.getMessage(), "There was an error"); }); @@ -841,7 +844,7 @@ class HubConnectionTest { @Test public void callingStartOnStartedHubConnectionNoOps() throws Exception { Transport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger() ,true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.start(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -855,7 +858,7 @@ class HubConnectionTest { @Test public void cannotSendBeforeStart() throws Exception { Transport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); Throwable exception = assertThrows(HubException.class, () -> hubConnection.send("inc")); @@ -865,7 +868,7 @@ class HubConnectionTest { @Test public void errorWhenReceivingInvokeWithIncorrectArgumentLength() throws Exception { MockTransport mockTransport = new MockTransport(); - HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true); + HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, new NullLogger(), true, new TestHttpClient()); hubConnection.on("Send", (s) -> { assertTrue(false); }, String.class); @@ -877,4 +880,90 @@ class HubConnectionTest { RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR)); assertEquals("Invocation provides 0 argument(s) but target expects 1.", exception.getMessage()); } + + @Test + public void negotiateSentOnStart() { + TestHttpClient client = new TestHttpClient() + .on("POST", (req) -> CompletableFuture.completedFuture(new HttpResponse(404, "", ""))); + + HubConnection hubConnection = new HubConnectionBuilder() + .withUrl("http://example.com") + .configureHttpClient(client) + .build(); + + try { + hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + } catch(Exception ex) {} + + List sentRequests = client.getSentRequests(); + assertEquals(1, sentRequests.size()); + assertEquals("http://example.com/negotiate", sentRequests.get(0).getUrl()); + } + + @Test + public void negotiateThatRedirectsForeverFailsAfter100Tries() throws InterruptedException, TimeoutException, Exception { + TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", + (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}"))); + + HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com") + .configureHttpClient(client).build(); + + ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS)); + assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getMessage()); + } + + @Test + public void afterSuccessfulNegotiateConnectsWithTransport() throws InterruptedException, TimeoutException, Exception { + TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", + (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", + "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); + + MockTransport transport = new MockTransport(); + HttpConnectionOptions options = new HttpConnectionOptions(); + options.setTransport(transport); + HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com", options) + .configureHttpClient(client).build(); + + hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + + String[] sentMessages = transport.getSentMessages(); + assertEquals(1, sentMessages.length); + assertEquals("{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR, sentMessages[0]); + } + + @Test + public void negotiateThatReturnsErrorThrowsFromStart() { + TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", + (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"error\":\"Test error.\"}"))); + + MockTransport transport = new MockTransport(); + HttpConnectionOptions options = new HttpConnectionOptions(); + options.setTransport(transport); + HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com", options) + .configureHttpClient(client).build(); + + ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS)); + assertEquals("Test error.", exception.getCause().getMessage()); + } + + @Test + public void negotiateRedirectIsFollowed() + throws InterruptedException, ExecutionException, TimeoutException, Exception { + TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", + (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) + .on("POST", "http://testexample.com/negotiate", + (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); + + MockTransport transport = new MockTransport(); + HttpConnectionOptions options = new HttpConnectionOptions(); + options.setTransport(transport); + HubConnection hubConnection = new HubConnectionBuilder().withUrl("http://example.com", options) + .configureHttpClient(client).build(); + + hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); + hubConnection.stop(); + } } \ No newline at end of file diff --git a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/TestHttpClient.java b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/TestHttpClient.java new file mode 100644 index 0000000000..8e831dcce5 --- /dev/null +++ b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/TestHttpClient.java @@ -0,0 +1,70 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.aspnet.signalr; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +class TestHttpClient extends HttpClient { + private Function> handler; + private List sentRequests; + + public TestHttpClient() { + this.sentRequests = new ArrayList<>(); + this.handler = (req) -> { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException(String.format("Request has no handler: %s %s", req.getMethod(), req.getUrl()))); + return future; + }; + } + + @Override + public CompletableFuture send(HttpRequest request) { + this.sentRequests.add(request); + return this.handler.apply(request); + } + + public List getSentRequests() { + return sentRequests; + } + + public TestHttpClient on(Function> handler) { + this.handler = (req) -> handler.apply(req); + return this; + } + + public TestHttpClient on(String method, Function> handler) { + Function> oldHandler = this.handler; + this.handler = (req) -> { + if (req.getMethod().equals(method)) { + return handler.apply(req); + } + + return oldHandler.apply(req); + }; + + return this; + } + + public TestHttpClient on(String method, String url, Function> handler) { + Function> oldHandler = this.handler; + this.handler = (req) -> { + if (req.getMethod().equals(method) && req.getUrl().equals(url)) { + return handler.apply(req); + } + + return oldHandler.apply(req); + }; + + return this; + } + + @Override + public WebSocketWrapper createWebSocket(String url, Map headers) { + throw new RuntimeException("WebSockets isn't supported in testing currently."); + } +} \ No newline at end of file diff --git a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportTest.java b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportTest.java index 4a40f3bdff..9be145f536 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportTest.java @@ -6,6 +6,7 @@ package com.microsoft.aspnet.signalr; import static org.junit.jupiter.api.Assertions.*; import java.util.HashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -13,8 +14,8 @@ import org.junit.jupiter.api.Test; class WebSocketTransportTest { @Test public void WebsocketThrowsIfItCantConnect() throws Exception { - Transport transport = new WebSocketTransport("http://www.notarealurl12345.fake", new NullLogger(), new HashMap<>()); - Throwable exception = assertThrows(Exception.class, () -> transport.start().get(1,TimeUnit.SECONDS)); + Transport transport = new WebSocketTransport("http://www.notarealurl12345.fake", new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger()); + ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start().get(1, TimeUnit.SECONDS)); assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage()); } } diff --git a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportUrlFormatTest.java b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportUrlFormatTest.java index 15be4a89b2..9e215af183 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportUrlFormatTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/aspnet/signalr/WebSocketTransportUrlFormatTest.java @@ -25,7 +25,7 @@ class WebSocketTransportUrlFormatTest { @ParameterizedTest @MethodSource("protocols") public void checkWebsocketUrlProtocol(String url, String expectedUrl) throws URISyntaxException { - WebSocketTransport webSocketTransport = new WebSocketTransport(url, new NullLogger(), new HashMap<>()); - assertEquals(expectedUrl, webSocketTransport.getUrl().toString()); + WebSocketTransport webSocketTransport = new WebSocketTransport(url, new HashMap<>(), new TestHttpClient(), new NullLogger()); + assertEquals(expectedUrl, webSocketTransport.getUrl()); } } \ No newline at end of file diff --git a/src/Common/BinaryMessageParser.cs b/src/Common/BinaryMessageParser.cs index 0f76cc8c72..b911f65e6e 100644 --- a/src/Common/BinaryMessageParser.cs +++ b/src/Common/BinaryMessageParser.cs @@ -27,7 +27,7 @@ namespace Microsoft.AspNetCore.Internal // remaining bits (%x0000000) are the lowest bits of the value. The most significant bit of the second // byte is 0 meaning this is last byte of the VarInt. The actual value bits (%x0101001) need to be // prepended to the bits we already read so the values is %01010010000000 i.e. 0x1480 (5248) - // We support paylads up to 2GB so the biggest number we support is 7fffffff which when encoded as + // We support payloads up to 2GB so the biggest number we support is 7fffffff which when encoded as // VarInt is 0xFF 0xFF 0xFF 0xFF 0x07 - hence the maximum length prefix is 5 bytes. var length = 0U; diff --git a/src/Common/JsonUtils.cs b/src/Common/JsonUtils.cs index a5ab92ecec..4b09210210 100644 --- a/src/Common/JsonUtils.cs +++ b/src/Common/JsonUtils.cs @@ -155,7 +155,7 @@ namespace Microsoft.AspNetCore.Internal public static bool ReadForType(JsonTextReader reader, Type type) { - // Explicity read values as dates from JSON with reader. + // Explicitly read values as dates from JSON with reader. // We do this because otherwise dates are read as strings // and the JsonSerializer will use a conversion method that won't // preserve UTC in DateTime.Kind for UTC ISO8601 dates diff --git a/src/Common/Utf8BufferTextReader.cs b/src/Common/Utf8BufferTextReader.cs index 8927373370..f78c878b1d 100644 --- a/src/Common/Utf8BufferTextReader.cs +++ b/src/Common/Utf8BufferTextReader.cs @@ -34,7 +34,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal reader = new Utf8BufferTextReader(); } - // Taken off the the thread static + // Taken off the thread static _cachedInstance = null; #if DEBUG if (reader._inUse) diff --git a/src/Common/Utf8BufferTextWriter.cs b/src/Common/Utf8BufferTextWriter.cs index f8592069bd..eb57dc2e36 100644 --- a/src/Common/Utf8BufferTextWriter.cs +++ b/src/Common/Utf8BufferTextWriter.cs @@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.Internal writer = new Utf8BufferTextWriter(); } - // Taken off the the thread static + // Taken off the thread static _cachedInstance = null; #if DEBUG if (writer._inUse) diff --git a/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs b/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs index e45d6f70a0..80f9c88d54 100644 --- a/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs +++ b/src/Microsoft.AspNetCore.Http.Connections.Client/HttpConnection.cs @@ -529,7 +529,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Client httpMessageHandler = new AccessTokenHttpMessageHandler(httpMessageHandler, this); } - // Wrap message handler after HttpMessageHandlerFactory to ensure not overriden + // Wrap message handler after HttpMessageHandlerFactory to ensure not overridden httpMessageHandler = new LoggingHttpMessageHandler(httpMessageHandler, _loggerFactory); var httpClient = new HttpClient(httpMessageHandler); diff --git a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs index 3b2d467673..18ed1f1ee6 100644 --- a/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Http.Connections/Internal/HttpConnectionDispatcher.cs @@ -299,7 +299,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal // If the status code is a 204 it means the connection is done if (context.Response.StatusCode == StatusCodes.Status204NoContent) { - // Cancel current request to release any waiting poll and let dispose aquire the lock + // Cancel current request to release any waiting poll and let dispose acquire the lock currentRequestTcs.TrySetCanceled(); // We should be able to safely dispose because there's no more data being written @@ -312,7 +312,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal } else if (resultTask.IsFaulted) { - // Cancel current request to release any waiting poll and let dispose aquire the lock + // Cancel current request to release any waiting poll and let dispose acquire the lock currentRequestTcs.TrySetCanceled(); // transport task was faulted, we should remove the connection diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs index 70bde08eb2..9ee9fabd63 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs @@ -801,7 +801,7 @@ namespace Microsoft.AspNetCore.SignalR.Client { // Adjust consumed and examined to point to the end of the handshake // response, this handles the case where invocations are sent in the same payload - // as the the negotiate response. + // as the negotiate response. consumed = buffer.Start; examined = consumed; diff --git a/src/Microsoft.AspNetCore.SignalR.Core/IUserIdProvider.cs b/src/Microsoft.AspNetCore.SignalR.Core/IUserIdProvider.cs index 0053705598..9f7d540154 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/IUserIdProvider.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/IUserIdProvider.cs @@ -12,7 +12,7 @@ namespace Microsoft.AspNetCore.SignalR /// /// Gets the user ID for the specified connection. /// - /// The connection get get the user ID for. + /// The connection to get the user ID for. /// The user ID for the specified connection. string GetUserId(HubConnectionContext connection); } diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs index 835b180665..6e1e474ca8 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs @@ -328,7 +328,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests await sseTransport.Output.WriteAsync(new byte[] { 0x42 }); - // For for send request to be in progress + // For send request to be in progress await sendSyncPoint.WaitForSyncPoint(); var stopTask = sseTransport.StopAsync(); diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Docker.cs b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Docker.cs index aec3a9adcf..f60e557ae5 100644 --- a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Docker.cs +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/Docker.cs @@ -81,7 +81,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests // create and run docker container, remove automatically when stopped, map 6379 from the container to 6379 localhost // use static name 'redisTestContainer' so if the container doesn't get removed we don't keep adding more // use redis base docker image - // 20 second timeout to allow redis image to be downloaded, should be a rare occurance, only happening when a new version is released + // 20 second timeout to allow redis image to be downloaded, should be a rare occurrence, only happening when a new version is released RunProcessAndThrowIfFailed(_path, $"run --rm -p 6379:6379 --name {_dockerContainerName} -d redis", "redis", logger, TimeSpan.FromSeconds(20)); // inspect the redis docker image and extract the IPAddress. Necessary when running tests from inside a docker container, spinning up a new docker container for redis