diff --git a/.vscode/launch.json b/.vscode/launch.json index 8a35acc3ed..cc0f2cbe68 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,7 +8,7 @@ "cwd": "${workspaceFolder}/clients/java/", "console": "externalTerminal", "stopOnEntry": false, - "mainClass": "com.microsoft.aspnet.signalr.sample.Chat", + "mainClass": "com.microsoft.signalr.sample.Chat", "args": "" }, { diff --git a/clients/java/signalr/build.gradle b/clients/java/signalr/build.gradle index 648de83eb6..f48f06f330 100644 --- a/clients/java/signalr/build.gradle +++ b/clients/java/signalr/build.gradle @@ -33,6 +33,7 @@ dependencies { testRuntime 'org.junit.jupiter:junit-jupiter-engine:5.3.1' implementation 'com.google.code.gson:gson:2.8.5' implementation 'com.squareup.okhttp3:okhttp:3.11.0' + implementation 'io.reactivex.rxjava2:rxjava:2.2.2' } spotless { diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java index 4bb4d6b51e..c2594826a8 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java @@ -6,13 +6,15 @@ package com.microsoft.signalr; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import io.reactivex.Single; + public class HttpHubConnectionBuilder { private String url; private Transport transport; private Logger logger; private HttpClient httpClient; private boolean skipNegotiate; - private Supplier> accessTokenProvider; + private Single accessTokenProvider; HttpHubConnectionBuilder(String url) { this.url = url; @@ -39,7 +41,7 @@ public class HttpHubConnectionBuilder { return this; } - public HttpHubConnectionBuilder withAccessTokenProvider(Supplier> accessTokenProvider) { + public HttpHubConnectionBuilder withAccessTokenProvider(Single accessTokenProvider) { this.accessTokenProvider = accessTokenProvider; return this; } diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index fdb252f05c..9d926023cc 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -9,12 +9,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import java.util.function.Supplier; + +import io.reactivex.Completable; +import io.reactivex.Single; public class HubConnection { private static final String RECORD_SEPARATOR = "\u001e"; @@ -32,13 +33,13 @@ public class HubConnection { private Logger logger; private List> onClosedCallbackList; private boolean skipNegotiate; - private Supplier> accessTokenProvider; + private Single accessTokenProvider; private Map headers = new HashMap<>(); private ConnectionState connectionState = null; private HttpClient httpClient; private String stopError; - HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Supplier> accessTokenProvider) { + HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single accessTokenProvider) { if (url == null || url.isEmpty()) { throw new IllegalArgumentException("A valid url is required."); } @@ -49,7 +50,7 @@ public class HubConnection { if (accessTokenProvider != null) { this.accessTokenProvider = accessTokenProvider; } else { - this.accessTokenProvider = () -> CompletableFuture.completedFuture(null); + this.accessTokenProvider = Single.just(""); } if (httpClient != null) { @@ -153,14 +154,11 @@ public class HubConnection { } if (negotiateResponse.getAccessToken() != null) { - this.accessTokenProvider = () -> CompletableFuture.completedFuture(negotiateResponse.getAccessToken()); + this.accessTokenProvider = Single.just(negotiateResponse.getAccessToken()); String token = ""; - try { - // We know the future is already completed in this case - // It's fine to call get() on it. - token = this.accessTokenProvider.get().get(); - } catch (InterruptedException | ExecutionException e) { - } + // We know the Single is non blocking in this case + // It's fine to call blockingGet() on it. + token = this.accessTokenProvider.blockingGet(); this.headers.put("Authorization", "Bearer " + token); } @@ -179,20 +177,21 @@ public class HubConnection { /** * Starts a connection to the server. - * @return A completable future that completes when the connection has been established. + * @return A Completable that completes when the connection has been established. */ - public CompletableFuture start() { + public Completable start() { if (hubConnectionState != HubConnectionState.DISCONNECTED) { - return CompletableFuture.completedFuture(null); + return Completable.complete(); } handshakeReceived = false; - CompletableFuture tokenFuture = accessTokenProvider.get() - .thenAccept((token) -> { - if (token != null) { - this.headers.put("Authorization", "Bearer " + token); - } - }); + CompletableFuture tokenFuture = new CompletableFuture<>(); + accessTokenProvider.subscribe(token -> { + if (token != null && !token.isEmpty()) { + this.headers.put("Authorization", "Bearer " + token); + } + tokenFuture.complete(null); + }); stopError = null; CompletableFuture negotiate = null; @@ -202,7 +201,7 @@ public class HubConnection { negotiate = tokenFuture.thenCompose((v) -> CompletableFuture.completedFuture(baseUrl)); } - return negotiate.thenCompose((url) -> { + return Completable.fromFuture(negotiate.thenCompose(url -> { logger.log(LogLevel.Debug, "Starting HubConnection."); if (transport == null) { transport = new WebSocketTransport(headers, httpClient, logger); @@ -211,27 +210,21 @@ public class HubConnection { transport.setOnReceive(this.callback); transport.setOnClose((message) -> stopConnection(message)); - try { - return transport.start(url).thenCompose((future) -> { - String handshake = HandshakeProtocol.createHandshakeRequestMessage( - new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); - return transport.send(handshake).thenRun(() -> { - hubConnectionStateLock.lock(); - try { - hubConnectionState = HubConnectionState.CONNECTED; - connectionState = new ConnectionState(this); - logger.log(LogLevel.Information, "HubConnection started."); - } finally { - hubConnectionStateLock.unlock(); - } - }); + return transport.start(url).thenCompose((future) -> { + String handshake = HandshakeProtocol.createHandshakeRequestMessage( + new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); + return transport.send(handshake).thenRun(() -> { + hubConnectionStateLock.lock(); + try { + hubConnectionState = HubConnectionState.CONNECTED; + connectionState = new ConnectionState(this); + logger.log(LogLevel.Information, "HubConnection started."); + } finally { + hubConnectionStateLock.unlock(); + } }); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + }); + })); } private CompletableFuture startNegotiate(String url, int negotiateAttempts) { @@ -268,7 +261,7 @@ public class HubConnection { /** * Stops a connection to the server. * @param errorMessage An error message if the connected needs to be stopped because of an error. - * @return A completable future that completes when the connection has been stopped. + * @return A Completable that completes when the connection has been stopped. */ private CompletableFuture stop(String errorMessage) { hubConnectionStateLock.lock(); @@ -292,10 +285,10 @@ public class HubConnection { /** * Stops a connection to the server. - * @return A completable future that completes when the connection has been stopped. + * @return A Completable that completes when the connection has been stopped. */ - public CompletableFuture stop() { - return stop(null); + public Completable stop() { + return Completable.fromFuture(stop(null)); } private void stopConnection(String errorMessage) { @@ -344,7 +337,7 @@ public class HubConnection { sendHubMessage(invocationMessage); } - public CompletableFuture invoke(Class returnType, String method, Object... args) throws Exception { + public Single invoke(Class returnType, String method, Object... args) throws Exception { String id = connectionState.getNextInvocationId(); InvocationMessage invocationMessage = new InvocationMessage(id, method, args); @@ -372,7 +365,7 @@ public class HubConnection { // where the map doesn't have the future yet when the response is returned sendHubMessage(invocationMessage); - return future; + return Single.fromFuture(future); } private void sendHubMessage(HubMessage message) throws Exception { diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index f2510011ce..c76fc4adad 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -11,10 +11,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; +import io.reactivex.Completable; +import io.reactivex.Single; class HubConnectionTest { private static final String RECORD_SEPARATOR = "\u001e"; @@ -22,7 +25,7 @@ class HubConnectionTest { @Test public void checkHubConnectionState() throws Exception { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); - hubConnection.start(); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); @@ -33,7 +36,7 @@ class HubConnectionTest { public void transportCloseTriggersStopInHubConnection() throws Exception { MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); mockTransport.stop(); @@ -51,7 +54,7 @@ class HubConnectionTest { message.set(error.getMessage()); }); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); mockTransport.stopWithError(errorMessage); assertEquals(errorMessage, message.get()); @@ -355,13 +358,15 @@ class HubConnectionTest { hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); - CompletableFuture result = hubConnection.invoke(Integer.class, "echo", "message"); + AtomicBoolean done = new AtomicBoolean(); + Single result = hubConnection.invoke(Integer.class, "echo", "message"); + result.doOnSuccess(value -> done.set(true)); assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); - assertFalse(result.isDone()); + assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); - assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); } @Test @@ -372,19 +377,23 @@ class HubConnectionTest { hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); - CompletableFuture result = hubConnection.invoke(Integer.class, "echo", "message"); - CompletableFuture result2 = hubConnection.invoke(String.class, "echo", "message"); + AtomicBoolean doneFirst = new AtomicBoolean(); + AtomicBoolean doneSecond = new AtomicBoolean(); + Single result = hubConnection.invoke(Integer.class, "echo", "message"); + Single result2 = hubConnection.invoke(String.class, "echo", "message"); + result.doOnSuccess(value -> doneFirst.set(true)); + result2.doOnSuccess(value -> doneSecond.set(true)); assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); assertEquals("{\"type\":1,\"invocationId\":\"2\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[2]); - assertFalse(result.isDone()); - assertFalse(result2.isDone()); + assertFalse(doneFirst.get()); + assertFalse(doneSecond.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"2\",\"result\":\"message\"}" + RECORD_SEPARATOR); - assertEquals("message", result2.get(1000L, TimeUnit.MILLISECONDS)); - assertFalse(result.isDone()); + assertEquals("message", result2.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertFalse(doneFirst.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); - assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); } @Test @@ -395,14 +404,16 @@ class HubConnectionTest { hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); + AtomicBoolean done = new AtomicBoolean(); // int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type // which does not work for primitives we have to write special logic for that case. - CompletableFuture result = hubConnection.invoke(int.class, "echo", "message"); - assertFalse(result.isDone()); + Single result = hubConnection.invoke(int.class, "echo", "message"); + result.doOnSuccess(value -> done.set(true)); + assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); - assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); } @Test @@ -413,17 +424,19 @@ class HubConnectionTest { hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); - CompletableFuture result = hubConnection.invoke(int.class, "echo", "message"); - assertFalse(result.isDone()); + AtomicBoolean done = new AtomicBoolean(); + Single result = hubConnection.invoke(int.class, "echo", "message"); + result.doOnSuccess(value -> done.set(true)); + assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"error\":\"There was an error\"}" + RECORD_SEPARATOR); String exceptionMessage = null; try { - result.get(1000L, TimeUnit.MILLISECONDS); + result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet(); assertFalse(true); } catch (Exception ex) { - exceptionMessage = ex.getMessage(); + exceptionMessage = ex.getCause().getMessage(); } assertEquals("com.microsoft.signalr.HubException: There was an error", exceptionMessage); @@ -437,14 +450,16 @@ class HubConnectionTest { hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); - CompletableFuture result = hubConnection.invoke(int.class, "echo", "message"); - assertFalse(result.isDone()); + AtomicBoolean done = new AtomicBoolean(); + Single result = hubConnection.invoke(int.class, "echo", "message"); + result.doOnSuccess(value -> done.set(true)); + assertFalse(done.get()); hubConnection.stop(); boolean hasException = false; try { - result.get(1000L, TimeUnit.MILLISECONDS); + result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet(); assertFalse(true); } catch (CancellationException ex) { hasException = true; @@ -894,10 +909,10 @@ class HubConnectionTest { assertTrue(false); }, String.class); - CompletableFuture startFuture = hubConnection.start(); + Completable startFuture = hubConnection.start(); mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); - startFuture.get(1000, TimeUnit.MILLISECONDS); + startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS); RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR)); assertEquals("Invocation provides 0 argument(s) but target expects 1.", exception.getMessage()); } @@ -913,7 +928,7 @@ class HubConnectionTest { .build(); try { - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); } catch(Exception ex) {} List sentRequests = client.getSentRequests(); @@ -931,8 +946,9 @@ class HubConnectionTest { .withHttpClient(client) .build(); - ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS)); - assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getMessage()); + RuntimeException exception = assertThrows(RuntimeException.class, + () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); + assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getCause().getMessage()); } @Test @@ -949,7 +965,7 @@ class HubConnectionTest { .withHttpClient(client) .build(); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); String[] sentMessages = transport.getSentMessages(); assertEquals(1, sentMessages.length); @@ -968,8 +984,9 @@ class HubConnectionTest { .withTransport(transport) .build(); - ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS)); - assertEquals("Test error.", exception.getCause().getMessage()); + RuntimeException exception = assertThrows(RuntimeException.class, + () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); + assertEquals("Test error.", exception.getCause().getCause().getMessage()); } @Test @@ -987,7 +1004,7 @@ class HubConnectionTest { .withHttpClient(client) .build(); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); } @@ -1010,10 +1027,10 @@ class HubConnectionTest { .create("http://example.com") .withTransport(transport) .withHttpClient(client) - .withAccessTokenProvider(() -> CompletableFuture.completedFuture("secretToken")) + .withAccessTokenProvider(Single.just("secretToken")) .build(); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop(); assertEquals("Bearer secretToken", token.get()); @@ -1036,10 +1053,10 @@ class HubConnectionTest { .create("http://example.com") .withTransport(transport) .withHttpClient(client) - .withAccessTokenProvider(() -> CompletableFuture.completedFuture("secretToken")) + .withAccessTokenProvider(Single.just("secretToken")) .build(); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); assertEquals("http://testexample.com/?id=bVOiRPG8-6YiJ6d7ZcTOVQ", transport.getUrl()); hubConnection.stop(); @@ -1055,13 +1072,13 @@ class HubConnectionTest { .shouldSkipNegotiate(true) .build(); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().get(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().get(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } @@ -1081,13 +1098,13 @@ class HubConnectionTest { .withHttpClient(client) .build(); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().get(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); - hubConnection.start().get(1000, TimeUnit.MILLISECONDS); + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop().get(1000, TimeUnit.MILLISECONDS); + hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } @@ -1105,7 +1122,8 @@ class HubConnectionTest { .withHttpClient(client) .build(); - ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS)); - assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getCause().getMessage()); + RuntimeException exception = assertThrows(RuntimeException.class, + () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS)); + assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getCause().getCause().getMessage()); } } \ No newline at end of file diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java index 01ae138262..f559a62b69 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java @@ -32,7 +32,7 @@ public class Chat { }); //This is a blocking call - hubConnection.start().get(); + hubConnection.start().blockingAwait(); String message = ""; while (!message.equals("leave")) {