From f7d10bec0273b1282ac02798e453a38bcde3f7ed Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 23 Oct 2018 10:52:26 -0700 Subject: [PATCH] [Java] Plumb RxJava through client (#3148) --- .../microsoft/signalr/DefaultHttpClient.java | 13 +- .../com/microsoft/signalr/HttpClient.java | 17 +- .../com/microsoft/signalr/HubConnection.java | 106 +++++----- .../microsoft/signalr/InvocationRequest.java | 17 +- .../signalr/OkHttpWebSocketWrapper.java | 31 +-- .../java/com/microsoft/signalr/Transport.java | 9 +- .../microsoft/signalr/WebSocketTransport.java | 13 +- .../microsoft/signalr/WebSocketWrapper.java | 9 +- .../microsoft/signalr/HubConnectionTest.java | 190 +++++++++--------- .../com/microsoft/signalr/MockTransport.java | 28 ++- .../com/microsoft/signalr/TestHttpClient.java | 21 +- .../signalr/WebSocketTransportTest.java | 5 +- 12 files changed, 242 insertions(+), 217 deletions(-) diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index 17d0526215..66675215e4 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -7,10 +7,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.reactivex.Single; +import io.reactivex.subjects.SingleSubject; import okhttp3.Call; import okhttp3.Callback; import okhttp3.Cookie; @@ -78,7 +79,7 @@ final class DefaultHttpClient extends HttpClient { } @Override - public CompletableFuture send(HttpRequest httpRequest) { + public Single send(HttpRequest httpRequest) { Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl()); switch (httpRequest.getMethod()) { @@ -100,24 +101,24 @@ final class DefaultHttpClient extends HttpClient { Request request = requestBuilder.build(); - CompletableFuture responseFuture = new CompletableFuture<>(); + SingleSubject responseSubject = SingleSubject.create(); client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { - responseFuture.completeExceptionally(e.getCause()); + responseSubject.onError(e.getCause()); } @Override public void onResponse(Call call, Response response) throws IOException { try (ResponseBody body = response.body()) { HttpResponse httpResponse = new HttpResponse(response.code(), response.message(), body.string()); - responseFuture.complete(httpResponse); + responseSubject.onSuccess(httpResponse); } } }); - return responseFuture; + return responseSubject; } @Override diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java index 4352fe825a..9b7a1b4fde 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java @@ -5,7 +5,8 @@ package com.microsoft.signalr; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; + +import io.reactivex.Single; class HttpRequest { private String method; @@ -74,46 +75,46 @@ class HttpResponse { } abstract class HttpClient { - public CompletableFuture get(String url) { + public Single get(String url) { HttpRequest request = new HttpRequest(); request.setUrl(url); request.setMethod("GET"); return this.send(request); } - public CompletableFuture get(String url, HttpRequest options) { + public Single get(String url, HttpRequest options) { options.setUrl(url); options.setMethod("GET"); return this.send(options); } - public CompletableFuture post(String url) { + public Single post(String url) { HttpRequest request = new HttpRequest(); request.setUrl(url); request.setMethod("POST"); return this.send(request); } - public CompletableFuture post(String url, HttpRequest options) { + public Single post(String url, HttpRequest options) { options.setUrl(url); options.setMethod("POST"); return this.send(options); } - public CompletableFuture delete(String url) { + public Single delete(String url) { HttpRequest request = new HttpRequest(); request.setUrl(url); request.setMethod("DELETE"); return this.send(request); } - public CompletableFuture delete(String url, HttpRequest options) { + public Single delete(String url, HttpRequest options) { options.setUrl(url); options.setMethod("DELETE"); return this.send(options); } - public abstract CompletableFuture send(HttpRequest request); + public abstract Single send(HttpRequest request); public abstract WebSocketWrapper createWebSocket(String url, Map headers); } \ No newline at end of file diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 101b40362c..1df62674e4 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory; import io.reactivex.Completable; import io.reactivex.Single; +import io.reactivex.subjects.CompletableSubject; +import io.reactivex.subjects.SingleSubject; public class HubConnection { private static final String RECORD_SEPARATOR = "\u001e"; @@ -49,7 +51,7 @@ public class HubConnection { private long keepAliveInterval = 15*1000; private long serverTimeout = 30*1000; private long tickRate = 1000; - private CompletableFuture handshakeResponseFuture; + private CompletableSubject handshakeResponseSubject; private long handshakeResponseTimeout = 15*1000; private final Logger logger = LoggerFactory.getLogger(HubConnection.class); @@ -140,18 +142,18 @@ public class HubConnection { handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString); } catch (RuntimeException ex) { RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex); - handshakeResponseFuture.completeExceptionally(exception); + handshakeResponseSubject.onError(exception); throw exception; } if (handshakeResponse.getHandshakeError() != null) { String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError(); logger.error(errorMessage); RuntimeException exception = new RuntimeException(errorMessage); - handshakeResponseFuture.completeExceptionally(exception); + handshakeResponseSubject.onError(exception); throw exception; } handshakeReceived = true; - handshakeResponseFuture.complete(null); + handshakeResponseSubject.onComplete(); payload = payload.substring(handshakeLength); // The payload only contained the handshake response so we can return. @@ -206,15 +208,21 @@ public class HubConnection { private void timeoutHandshakeResponse(long timeout, TimeUnit unit) { ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); - scheduledThreadPool.schedule(() -> handshakeResponseFuture.completeExceptionally( - new TimeoutException("Timed out waiting for the server to respond to the handshake message.")), timeout, unit); + scheduledThreadPool.schedule(() -> { + // If onError is called on a completed subject the global error handler is called + if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) + { + handshakeResponseSubject.onError( + new TimeoutException("Timed out waiting for the server to respond to the handshake message.")); + } + }, timeout, unit); } - private CompletableFuture handleNegotiate(String url) { + private Single handleNegotiate(String url) { HttpRequest request = new HttpRequest(); request.addHeaders(this.headers); - return httpClient.post(Negotiate.resolveNegotiateUrl(url), request).thenCompose((response) -> { + return httpClient.post(Negotiate.resolveNegotiateUrl(url), request).map((response) -> { if (response.getStatusCode() != 200) { throw new RuntimeException(String.format("Unexpected status code returned from negotiate: %d %s.", response.getStatusCode(), response.getStatusText())); } @@ -233,7 +241,7 @@ public class HubConnection { this.headers.put("Authorization", "Bearer " + token); } - return CompletableFuture.completedFuture(negotiateResponse); + return negotiateResponse; }); } @@ -256,25 +264,27 @@ public class HubConnection { return Completable.complete(); } - handshakeResponseFuture = new CompletableFuture<>(); + handshakeResponseSubject = CompletableSubject.create(); handshakeReceived = false; - CompletableFuture tokenFuture = new CompletableFuture<>(); + CompletableSubject tokenCompletable = CompletableSubject.create(); accessTokenProvider.subscribe(token -> { if (token != null && !token.isEmpty()) { this.headers.put("Authorization", "Bearer " + token); } - tokenFuture.complete(null); + tokenCompletable.onComplete(); }); stopError = null; - CompletableFuture negotiate = null; + Single negotiate = null; if (!skipNegotiate) { - negotiate = tokenFuture.thenCompose((v) -> startNegotiate(baseUrl, 0)); + negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0))); } else { - negotiate = tokenFuture.thenCompose((v) -> CompletableFuture.completedFuture(baseUrl)); + negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(baseUrl))); } - return Completable.fromFuture(negotiate.thenCompose(url -> { + CompletableSubject start = CompletableSubject.create(); + + negotiate.flatMapCompletable(url -> { logger.debug("Starting HubConnection."); if (transport == null) { transport = new WebSocketTransport(headers, httpClient); @@ -283,16 +293,17 @@ public class HubConnection { transport.setOnReceive(this.callback); transport.setOnClose((message) -> stopConnection(message)); - return transport.start(url).thenCompose((future) -> { + return transport.start(url).andThen(Completable.defer(() -> { String handshake = HandshakeProtocol.createHandshakeRequestMessage( new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); - return transport.send(handshake).thenCompose((innerFuture) -> { + + return transport.send(handshake).andThen(Completable.defer(() -> { timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS); - return handshakeResponseFuture.thenRun(() -> { + return handshakeResponseSubject.andThen(Completable.defer(() -> { hubConnectionStateLock.lock(); try { - hubConnectionState = HubConnectionState.CONNECTED; connectionState = new ConnectionState(this); + hubConnectionState = HubConnectionState.CONNECTED; logger.info("HubConnection started."); resetServerTimeout(); @@ -320,18 +331,23 @@ public class HubConnection { } finally { hubConnectionStateLock.unlock(); } - }); - }); - }); - })); + + return Completable.complete(); + })); + })); + })); + // subscribe makes this a "hot" completable so this runs immediately + }).subscribeWith(start); + + return start; } - private CompletableFuture startNegotiate(String url, int negotiateAttempts) { + private Single startNegotiate(String url, int negotiateAttempts) { if (hubConnectionState != HubConnectionState.DISCONNECTED) { - return CompletableFuture.completedFuture(null); + return Single.just(null); } - return handleNegotiate(url).thenCompose((response) -> { + return handleNegotiate(url).flatMap(response -> { if (response.getRedirectUrl() != null && negotiateAttempts >= MAX_NEGOTIATE_ATTEMPTS) { throw new RuntimeException("Negotiate redirection limit exceeded."); } @@ -350,7 +366,7 @@ public class HubConnection { } } - return CompletableFuture.completedFuture(finalUrl); + return Single.just(finalUrl); } return startNegotiate(response.getRedirectUrl(), negotiateAttempts + 1); @@ -363,11 +379,11 @@ public class HubConnection { * @param errorMessage An error message if the connected needs to be stopped because of an error. * @return A Completable that completes when the connection has been stopped. */ - private CompletableFuture stop(String errorMessage) { + private Completable stop(String errorMessage) { hubConnectionStateLock.lock(); try { if (hubConnectionState == HubConnectionState.DISCONNECTED) { - return CompletableFuture.completedFuture(null); + return Completable.complete(); } if (errorMessage != null) { @@ -389,7 +405,7 @@ public class HubConnection { * @return A Completable that completes when the connection has been stopped. */ public Completable stop() { - return Completable.fromFuture(stop(null)); + return stop(null); } private void stopConnection(String errorMessage) { @@ -409,7 +425,7 @@ public class HubConnection { connectionState = null; logger.info("HubConnection stopped."); hubConnectionState = HubConnectionState.DISCONNECTED; - handshakeResponseFuture.complete(null); + handshakeResponseSubject.onComplete(); } finally { hubConnectionStateLock.unlock(); } @@ -452,31 +468,27 @@ public class HubConnection { String id = connectionState.getNextInvocationId(); InvocationMessage invocationMessage = new InvocationMessage(id, method, args); - CompletableFuture future = new CompletableFuture<>(); + SingleSubject subject = SingleSubject.create(); InvocationRequest irq = new InvocationRequest(returnType, id); connectionState.addInvocation(irq); // forward the invocation result or error to the user // run continuations on a separate thread - CompletableFuture pendingCall = irq.getPendingCall(); - pendingCall.whenCompleteAsync((result, error) -> { - if (error == null) { - // Primitive types can't be cast with the Class cast function - if (returnType.isPrimitive()) { - future.complete((T)result); - } else { - future.complete(returnType.cast(result)); - } + Single pendingCall = irq.getPendingCall(); + pendingCall.subscribe(result -> { + // Primitive types can't be cast with the Class cast function + if (returnType.isPrimitive()) { + subject.onSuccess((T)result); } else { - future.completeExceptionally(error); + subject.onSuccess(returnType.cast(result)); } - }); + }, error -> subject.onError(error)); - // Make sure the actual send is after setting up the future otherwise there is a race - // where the map doesn't have the future yet when the response is returned + // Make sure the actual send is after setting up the callbacks otherwise there is a race + // where the map doesn't have the callbacks yet when the response is returned sendHubMessage(invocationMessage); - return Single.fromFuture(future); + return subject; } private void sendHubMessage(HubMessage message) { diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java index d4fa34c66e..3d12d39d04 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java @@ -3,11 +3,14 @@ package com.microsoft.signalr; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CancellationException; + +import io.reactivex.Single; +import io.reactivex.subjects.SingleSubject; class InvocationRequest { private final Class returnType; - private final CompletableFuture pendingCall = new CompletableFuture<>(); + private final SingleSubject pendingCall = SingleSubject.create(); private final String invocationId; InvocationRequest(Class returnType, String invocationId) { @@ -17,21 +20,21 @@ class InvocationRequest { public void complete(CompletionMessage completion) { if (completion.getResult() != null) { - pendingCall.complete(completion.getResult()); + pendingCall.onSuccess(completion.getResult()); } else { - pendingCall.completeExceptionally(new HubException(completion.getError())); + pendingCall.onError(new HubException(completion.getError())); } } public void fail(Exception ex) { - pendingCall.completeExceptionally(ex); + pendingCall.onError(ex); } public void cancel() { - pendingCall.cancel(false); + pendingCall.onError(new CancellationException("Invocation was canceled.")); } - public CompletableFuture getPendingCall() { + public Single getPendingCall() { return pendingCall; } diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/OkHttpWebSocketWrapper.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/OkHttpWebSocketWrapper.java index 69d4b16027..1f5710afa9 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/OkHttpWebSocketWrapper.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/OkHttpWebSocketWrapper.java @@ -4,12 +4,13 @@ package com.microsoft.signalr; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.reactivex.Completable; +import io.reactivex.subjects.CompletableSubject; import okhttp3.Headers; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -24,8 +25,8 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper { private OkHttpClient client; private OnReceiveCallBack onReceive; private BiConsumer onClose; - private CompletableFuture startFuture = new CompletableFuture<>(); - private CompletableFuture closeFuture = new CompletableFuture<>(); + private CompletableSubject startSubject = CompletableSubject.create(); + private CompletableSubject closeSubject = CompletableSubject.create(); private final Logger logger = LoggerFactory.getLogger(OkHttpWebSocketWrapper.class); @@ -36,7 +37,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper { } @Override - public CompletableFuture start() { + public Completable start() { Headers.Builder headerBuilder = new Headers.Builder(); for (String key : headers.keySet()) { headerBuilder.add(key, headers.get(key)); @@ -48,19 +49,19 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper { .build(); this.websocketClient = client.newWebSocket(request, new SignalRWebSocketListener()); - return startFuture; + return startSubject; } @Override - public CompletableFuture stop() { + public Completable stop() { websocketClient.close(1000, "HubConnection stopped."); - return closeFuture; + return closeSubject; } @Override - public CompletableFuture send(String message) { + public Completable send(String message) { websocketClient.send(message); - return CompletableFuture.completedFuture(null); + return Completable.complete(); } @Override @@ -76,7 +77,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper { private class SignalRWebSocketListener extends WebSocketListener { @Override public void onOpen(WebSocket webSocket, Response response) { - startFuture.complete(null); + startSubject.onComplete(); } @Override @@ -92,25 +93,25 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper { @Override public void onClosing(WebSocket webSocket, int code, String reason) { onClose.accept(code, reason); - closeFuture.complete(null); + closeSubject.onComplete(); checkStartFailure(); } @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { logger.error("Websocket closed from an error: {}.", t.getMessage()); - closeFuture.completeExceptionally(new RuntimeException(t)); + closeSubject.onError(new RuntimeException(t)); onClose.accept(null, t.getMessage()); checkStartFailure(); } private void checkStartFailure() { - // If the start future hasn't completed yet, then we need to complete it + // If the start task hasn't completed yet, then we need to complete it // exceptionally. - if (!startFuture.isDone()) { + if (!startSubject.hasComplete()) { String errorMessage = "There was an error starting the Websockets transport."; logger.error("Websocket closed from an error: {}.", errorMessage); - startFuture.completeExceptionally(new RuntimeException(errorMessage)); + startSubject.onError(new RuntimeException(errorMessage)); } } } diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/Transport.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/Transport.java index 04b7085df5..bfa24578f3 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/Transport.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/Transport.java @@ -3,14 +3,15 @@ package com.microsoft.signalr; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import io.reactivex.Completable; + interface Transport { - CompletableFuture start(String url); - CompletableFuture send(String message); + Completable start(String url); + Completable send(String message); void setOnReceive(OnReceiveCallBack callback); void onReceive(String message); void setOnClose(Consumer onCloseCallback); - CompletableFuture stop(); + Completable stop(); } diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketTransport.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketTransport.java index 9f6abc87a6..230914fccb 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketTransport.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketTransport.java @@ -4,12 +4,13 @@ package com.microsoft.signalr; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.reactivex.Completable; + class WebSocketTransport implements Transport { private WebSocketWrapper webSocketClient; private OnReceiveCallBack onReceiveCallBack; @@ -46,7 +47,7 @@ class WebSocketTransport implements Transport { } @Override - public CompletableFuture start(String url) { + public Completable start(String url) { this.url = formatUrl(url); logger.debug("Starting Websocket connection."); this.webSocketClient = client.createWebSocket(this.url, this.headers); @@ -57,11 +58,11 @@ class WebSocketTransport implements Transport { } }); - return webSocketClient.start().thenRun(() -> logger.info("WebSocket transport connected to: {}.", this.url)); + return webSocketClient.start().doOnComplete(() -> logger.info("WebSocket transport connected to: {}.", this.url)); } @Override - public CompletableFuture send(String message) { + public Completable send(String message) { return webSocketClient.send(message); } @@ -82,8 +83,8 @@ class WebSocketTransport implements Transport { } @Override - public CompletableFuture stop() { - return webSocketClient.stop().whenComplete((i, j) -> logger.info("WebSocket connection stopped.")); + public Completable stop() { + return webSocketClient.stop().doOnEvent(t -> logger.info("WebSocket connection stopped.")); } void onClose(int code, String reason) { diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketWrapper.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketWrapper.java index 6b0f4a6011..064dc3eaab 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketWrapper.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/WebSocketWrapper.java @@ -3,15 +3,16 @@ package com.microsoft.signalr; -import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; +import io.reactivex.Completable; + abstract class WebSocketWrapper { - public abstract CompletableFuture start(); + public abstract Completable start(); - public abstract CompletableFuture stop(); + public abstract Completable stop(); - public abstract CompletableFuture send(String message); + public abstract Completable send(String message); public abstract void setOnReceive(OnReceiveCallBack onReceive); diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index 828fbdc4cb..bd14c0a7a5 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -7,7 +7,6 @@ import static org.junit.jupiter.api.Assertions.*; import java.util.List; import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -16,8 +15,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; -import io.reactivex.Completable; import io.reactivex.Single; +import io.reactivex.subjects.SingleSubject; class HubConnectionTest { private static final String RECORD_SEPARATOR = "\u001e"; @@ -25,7 +24,7 @@ class HubConnectionTest { @Test public void checkHubConnectionState() { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); @@ -36,7 +35,7 @@ class HubConnectionTest { public void transportCloseTriggersStopInHubConnection() { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); mockTransport.stop(); @@ -54,7 +53,7 @@ class HubConnectionTest { message.set(error.getMessage()); }); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); mockTransport.stopWithError(errorMessage); assertEquals(errorMessage, message.get()); @@ -69,10 +68,9 @@ class HubConnectionTest { .shouldSkipNegotiate(true) .withHandshakeResponseTimeout(100) .build(); - Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); - assertEquals(ExecutionException.class, exception.getCause().getClass()); - assertEquals(TimeoutException.class, exception.getCause().getCause().getClass()); - assertEquals(exception.getCause().getCause().getMessage(), "Timed out waiting for the server to respond to the handshake message."); + Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals(TimeoutException.class, exception.getCause().getClass()); + assertEquals("Timed out waiting for the server to respond to the handshake message.", exception.getCause().getMessage()); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } @@ -81,7 +79,7 @@ class HubConnectionTest { Transport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); @@ -93,7 +91,7 @@ class HubConnectionTest { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -108,6 +106,7 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); + mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait(); Throwable exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{" + RECORD_SEPARATOR)); assertEquals("An invalid handshake response was received from the server.", exception.getMessage()); @@ -120,6 +119,7 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); + mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait(); Throwable exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"error\":\"Requested protocol 'messagepack' is not available.\"}" + RECORD_SEPARATOR)); assertEquals("Error in handshake Requested protocol 'messagepack' is not available.", exception.getMessage()); } @@ -136,7 +136,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -160,7 +160,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -187,7 +187,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -212,7 +212,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -241,7 +241,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -273,7 +273,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -308,7 +308,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String message = mockTransport.getSentMessages()[0]; String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; @@ -336,7 +336,7 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); try { mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); @@ -360,7 +360,7 @@ class HubConnectionTest { hubConnection.on("add", action, Double.class); assertEquals(Double.valueOf(0), value.get()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -372,7 +372,7 @@ class HubConnectionTest { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); AtomicBoolean done = new AtomicBoolean(); Single result = hubConnection.invoke(Integer.class, "echo", "message"); @@ -390,7 +390,7 @@ class HubConnectionTest { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); AtomicBoolean doneFirst = new AtomicBoolean(); AtomicBoolean doneSecond = new AtomicBoolean(); @@ -416,7 +416,7 @@ class HubConnectionTest { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); AtomicBoolean done = new AtomicBoolean(); // int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type @@ -435,7 +435,7 @@ class HubConnectionTest { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); AtomicBoolean done = new AtomicBoolean(); Single result = hubConnection.invoke(int.class, "echo", "message"); @@ -448,11 +448,11 @@ class HubConnectionTest { try { result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet(); assertFalse(true); - } catch (Exception ex) { - exceptionMessage = ex.getCause().getMessage(); + } catch (HubException ex) { + exceptionMessage = ex.getMessage(); } - assertEquals("com.microsoft.signalr.HubException: There was an error", exceptionMessage); + assertEquals("There was an error", exceptionMessage); } @Test @@ -460,7 +460,7 @@ class HubConnectionTest { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); AtomicBoolean done = new AtomicBoolean(); Single result = hubConnection.invoke(int.class, "echo", "message"); @@ -469,15 +469,15 @@ class HubConnectionTest { hubConnection.stop(); - boolean hasException = false; + RuntimeException hasException = null; try { result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet(); assertFalse(true); } catch (CancellationException ex) { - hasException = true; + hasException = ex; } - assertTrue(hasException); + assertEquals("Invocation was canceled.", hasException.getMessage()); } @Test @@ -491,7 +491,7 @@ class HubConnectionTest { value.getAndUpdate((val) -> val + 1); }); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. @@ -509,7 +509,7 @@ class HubConnectionTest { value.set(param); }, String.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}" + RECORD_SEPARATOR); hubConnection.send("inc", "Hello World"); @@ -533,7 +533,7 @@ class HubConnectionTest { value2.set(param2); }, String.class, Double.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\", 12]}" + RECORD_SEPARATOR); hubConnection.send("inc", "Hello World", 12); @@ -561,7 +561,7 @@ class HubConnectionTest { value3.set(param3); }, String.class, String.class, String.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\"]}" + RECORD_SEPARATOR); hubConnection.send("inc", "A", "B", "C"); @@ -593,7 +593,7 @@ class HubConnectionTest { value4.set(param4); }, String.class, String.class, String.class, String.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\", \"D\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -628,7 +628,7 @@ class HubConnectionTest { value5.set(param5); }, String.class, String.class, String.class, Boolean.class, Double.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12 ]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -667,7 +667,7 @@ class HubConnectionTest { value6.set(param6); }, String.class, String.class, String.class, Boolean.class, Double.class, String.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -710,7 +710,7 @@ class HubConnectionTest { value7.set(param7); }, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -757,7 +757,7 @@ class HubConnectionTest { value8.set(param8); }, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class, String.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\",\"F\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. assertEquals("A", value1.get()); @@ -789,7 +789,7 @@ class HubConnectionTest { value1.set(param1); }, Custom.class); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[{\"number\":1,\"str\":\"A\",\"bools\":[true,false]}]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -802,18 +802,19 @@ class HubConnectionTest { } @Test - public void receiveHandshakeResponseAndMessage() { + public void receiveHandshakeResponseAndMessage() { AtomicReference value = new AtomicReference(0.0); MockTransport mockTransport = new MockTransport(false); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.on("inc", () ->{ + hubConnection.on("inc", () -> { assertEquals(Double.valueOf(0), value.get()); value.getAndUpdate((val) -> val + 1); }); // On start we're going to receive the handshake response and also an invocation in the same payload. hubConnection.start(); + mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait(); String expectedSentMessage = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; assertEquals(expectedSentMessage, mockTransport.getSentMessages()[0]); @@ -827,7 +828,7 @@ class HubConnectionTest { public void onClosedCallbackRunsWhenStopIsCalled() { AtomicReference value1 = new AtomicReference<>(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); hubConnection.onClosed((ex) -> { assertNull(value1.get()); value1.set("Closed callback ran."); @@ -843,7 +844,7 @@ class HubConnectionTest { AtomicReference value1 = new AtomicReference<>(); AtomicReference value2 = new AtomicReference<>(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); hubConnection.onClosed((ex) -> { assertNull(value1.get()); @@ -871,7 +872,7 @@ class HubConnectionTest { hubConnection.onClosed((ex) -> { assertEquals(ex.getMessage(), "There was an error"); }); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -883,10 +884,10 @@ class HubConnectionTest { @Test public void callingStartOnStartedHubConnectionNoOps() { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); @@ -910,9 +911,8 @@ class HubConnectionTest { assertTrue(false); }, String.class); - Completable startFuture = hubConnection.start(); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); - startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS); RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR)); assertEquals("Invocation provides 0 argument(s) but target expects 1.", exception.getMessage()); } @@ -920,16 +920,15 @@ class HubConnectionTest { @Test public void negotiateSentOnStart() { TestHttpClient client = new TestHttpClient() - .on("POST", (req) -> CompletableFuture.completedFuture(new HttpResponse(404, "", ""))); + .on("POST", (req) -> Single.just(new HttpResponse(404, "", ""))); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") .withHttpClient(client) .build(); - try { - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); - } catch(Exception ex) {} + Exception exception = assertThrows(RuntimeException.class, () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals("Unexpected status code returned from negotiate: 404 .", exception.getMessage()); List sentRequests = client.getSentRequests(); assertEquals(1, sentRequests.size()); @@ -939,7 +938,7 @@ class HubConnectionTest { @Test public void negotiateThatRedirectsForeverFailsAfter100Tries() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", - (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}"))); + (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}"))); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") @@ -947,14 +946,14 @@ class HubConnectionTest { .build(); RuntimeException exception = assertThrows(RuntimeException.class, - () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); - assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getCause().getMessage()); + () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals("Negotiate redirection limit exceeded.", exception.getMessage()); } @Test public void afterSuccessfulNegotiateConnectsWithTransport() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", - (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", + (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); @@ -965,7 +964,7 @@ class HubConnectionTest { .withHttpClient(client) .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); String[] sentMessages = transport.getSentMessages(); assertEquals(1, sentMessages.length); @@ -975,7 +974,7 @@ class HubConnectionTest { @Test public void negotiateThatReturnsErrorThrowsFromStart() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", - (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"error\":\"Test error.\"}"))); + (req) -> Single.just(new HttpResponse(200, "", "{\"error\":\"Test error.\"}"))); MockTransport transport = new MockTransport(true); HubConnection hubConnection = HubConnectionBuilder @@ -985,16 +984,16 @@ class HubConnectionTest { .build(); RuntimeException exception = assertThrows(RuntimeException.class, - () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); - assertEquals("Test error.", exception.getCause().getCause().getMessage()); + () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals("Test error.", exception.getMessage()); } @Test public void negotiateRedirectIsFollowed() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", - (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) + (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) .on("POST", "http://testexample.com/negotiate", - (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); MockTransport transport = new MockTransport(true); @@ -1004,7 +1003,7 @@ class HubConnectionTest { .withHttpClient(client) .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); } @@ -1016,8 +1015,7 @@ class HubConnectionTest { .on("POST", "http://example.com/negotiate", (req) -> { token.set(req.getHeaders().get("Authorization")); - return CompletableFuture - .completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")); }); @@ -1029,7 +1027,7 @@ class HubConnectionTest { .withAccessTokenProvider(Single.just("secretToken")) .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); assertEquals("Bearer secretToken", token.get()); @@ -1039,11 +1037,10 @@ class HubConnectionTest { public void accessTokenProviderIsOverriddenFromRedirectNegotiate() { AtomicReference token = new AtomicReference<>(); TestHttpClient client = new TestHttpClient() - .on("POST", "http://example.com/negotiate", (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"newToken\"}"))) + .on("POST", "http://example.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"newToken\"}"))) .on("POST", "http://testexample.com/negotiate", (req) -> { token.set(req.getHeaders().get("Authorization")); - return CompletableFuture - .completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")); }); @@ -1055,7 +1052,7 @@ class HubConnectionTest { .withAccessTokenProvider(Single.just("secretToken")) .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); assertEquals("http://testexample.com/?id=bVOiRPG8-6YiJ6d7ZcTOVQ", transport.getUrl()); hubConnection.stop(); @@ -1067,14 +1064,14 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); hubConnection.setServerTimeout(1); hubConnection.setTickRate(1); - CompletableFuture closedFuture = new CompletableFuture<>(); + SingleSubject closedSubject = SingleSubject.create(); hubConnection.onClosed((e) -> { - closedFuture.complete(e); + closedSubject.onSuccess(e); }); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); - assertEquals("Server timeout elapsed without receiving a message from the server.", closedFuture.get(1000, TimeUnit.MILLISECONDS).getMessage()); + assertEquals("Server timeout elapsed without receiving a message from the server.", closedSubject.timeout(1, TimeUnit.SECONDS).blockingGet().getMessage()); } @Test @@ -1084,7 +1081,7 @@ class HubConnectionTest { hubConnection.setKeepAliveInterval(1); hubConnection.setTickRate(1); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); TimeUnit.MILLISECONDS.sleep(100); hubConnection.stop(); @@ -1103,8 +1100,7 @@ class HubConnectionTest { .on("POST", "http://example.com/negotiate", (req) -> { header.set(req.getHeaders().get("ExampleHeader")); - return CompletableFuture - .completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")); }); @@ -1116,7 +1112,7 @@ class HubConnectionTest { .withHeader("ExampleHeader", "ExampleValue") .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); assertEquals("ExampleValue", header.get()); @@ -1129,8 +1125,7 @@ class HubConnectionTest { .on("POST", "http://example.com/negotiate", (req) -> { header.set(req.getHeaders().get("ExampleHeader")); - return CompletableFuture - .completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")); }); @@ -1143,7 +1138,7 @@ class HubConnectionTest { .withHeader("ExampleHeader", "New Value") .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); assertEquals("New Value", header.get()); @@ -1158,13 +1153,13 @@ class HubConnectionTest { .shouldSkipNegotiate(true) .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } @@ -1172,10 +1167,8 @@ class HubConnectionTest { public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() { MockTransport mockTransport = new MockTransport(); TestHttpClient client = new TestHttpClient() - .on("POST", "http://example.com/negotiate", (req) -> CompletableFuture - .completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) - .on("POST", "http://testexample.com/negotiate", (req) -> CompletableFuture - .completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + .on("POST", "http://example.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) + .on("POST", "http://testexample.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); HubConnection hubConnection = HubConnectionBuilder @@ -1184,13 +1177,13 @@ class HubConnectionTest { .withHttpClient(client) .build(); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); - hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } @@ -1198,8 +1191,7 @@ class HubConnectionTest { public void non200FromNegotiateThrowsError() { TestHttpClient client = new TestHttpClient() .on("POST", "http://example.com/negotiate", - (req) -> CompletableFuture - .completedFuture(new HttpResponse(500, "Internal server error", ""))); + (req) -> Single.just(new HttpResponse(500, "Internal server error", ""))); MockTransport transport = new MockTransport(); HubConnection hubConnection = HubConnectionBuilder @@ -1209,7 +1201,7 @@ class HubConnectionTest { .build(); RuntimeException exception = assertThrows(RuntimeException.class, - () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); - assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getCause().getCause().getMessage()); + () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getMessage()); } } \ No newline at end of file diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java index 9033861b51..54a61a3898 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java @@ -4,9 +4,11 @@ package com.microsoft.signalr; import java.util.ArrayList; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import io.reactivex.Completable; +import io.reactivex.subjects.CompletableSubject; + class MockTransport implements Transport { private OnReceiveCallBack onReceiveCallBack; private ArrayList sentMessages = new ArrayList<>(); @@ -14,6 +16,8 @@ class MockTransport implements Transport { private Consumer onClose; final private boolean ignorePings; final private boolean autoHandshake; + final private CompletableSubject startSubject = CompletableSubject.create(); + final private CompletableSubject stopSubject = CompletableSubject.create(); private static final String RECORD_SEPARATOR = "\u001e"; @@ -31,7 +35,7 @@ class MockTransport implements Transport { } @Override - public CompletableFuture start(String url) { + public Completable start(String url) { this.url = url; if (autoHandshake) { try { @@ -40,15 +44,16 @@ class MockTransport implements Transport { throw new RuntimeException(e); } } - return CompletableFuture.completedFuture(null); + startSubject.onComplete(); + return startSubject; } @Override - public CompletableFuture send(String message) { + public Completable send(String message) { if (!(ignorePings && message.equals("{\"type\":6}" + RECORD_SEPARATOR))) { sentMessages.add(message); } - return CompletableFuture.completedFuture(null); + return Completable.complete(); } @Override @@ -67,9 +72,10 @@ class MockTransport implements Transport { } @Override - public CompletableFuture stop() { + public Completable stop() { onClose.accept(null); - return CompletableFuture.completedFuture(null); + stopSubject.onComplete(); + return stopSubject; } public void stopWithError(String errorMessage) { @@ -87,4 +93,12 @@ class MockTransport implements Transport { public String getUrl() { return this.url; } + + public Completable getStartTask() { + return startSubject; + } + + public Completable getStopTask() { + return stopSubject; + } } diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java index 056942034a..166ac43bf0 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java @@ -6,24 +6,23 @@ package com.microsoft.signalr; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import io.reactivex.Single; + class TestHttpClient extends HttpClient { - private Function> handler; + private Function> handler; private List sentRequests; public TestHttpClient() { this.sentRequests = new ArrayList<>(); this.handler = (req) -> { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new RuntimeException(String.format("Request has no handler: %s %s", req.getMethod(), req.getUrl()))); - return future; + return Single.error(new RuntimeException(String.format("Request has no handler: %s %s", req.getMethod(), req.getUrl()))); }; } @Override - public CompletableFuture send(HttpRequest request) { + public Single send(HttpRequest request) { this.sentRequests.add(request); return this.handler.apply(request); } @@ -32,13 +31,13 @@ class TestHttpClient extends HttpClient { return sentRequests; } - public TestHttpClient on(Function> handler) { + public TestHttpClient on(Function> handler) { this.handler = (req) -> handler.apply(req); return this; } - public TestHttpClient on(String method, Function> handler) { - Function> oldHandler = this.handler; + public TestHttpClient on(String method, Function> handler) { + Function> oldHandler = this.handler; this.handler = (req) -> { if (req.getMethod().equals(method)) { return handler.apply(req); @@ -50,8 +49,8 @@ class TestHttpClient extends HttpClient { return this; } - public TestHttpClient on(String method, String url, Function> handler) { - Function> oldHandler = this.handler; + public TestHttpClient on(String method, String url, Function> handler) { + Function> oldHandler = this.handler; this.handler = (req) -> { if (req.getMethod().equals(method) && req.getUrl().equals(url)) { return handler.apply(req); diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java index 96f4de196e..9761412692 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java @@ -6,7 +6,6 @@ package com.microsoft.signalr; import static org.junit.jupiter.api.Assertions.*; import java.util.HashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -15,7 +14,7 @@ class WebSocketTransportTest { @Test public void WebsocketThrowsIfItCantConnect() { Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient()); - ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start("http://www.example.com").get(1, TimeUnit.SECONDS)); - assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage()); + RuntimeException exception = assertThrows(RuntimeException.class, () -> transport.start("http://www.example.com").blockingAwait(1, TimeUnit.SECONDS)); + assertEquals("There was an error starting the Websockets transport.", exception.getMessage()); } }