[Java] Make public API look like RxJava (#3103)

This commit is contained in:
BrennanConroy 2018-10-10 08:50:41 -07:00 committed by GitHub
parent b63c3816d5
commit cd33755bee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 98 deletions

2
.vscode/launch.json vendored
View File

@ -8,7 +8,7 @@
"cwd": "${workspaceFolder}/clients/java/",
"console": "externalTerminal",
"stopOnEntry": false,
"mainClass": "com.microsoft.aspnet.signalr.sample.Chat",
"mainClass": "com.microsoft.signalr.sample.Chat",
"args": ""
},
{

View File

@ -33,6 +33,7 @@ dependencies {
testRuntime 'org.junit.jupiter:junit-jupiter-engine:5.3.1'
implementation 'com.google.code.gson:gson:2.8.5'
implementation 'com.squareup.okhttp3:okhttp:3.11.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
}
spotless {

View File

@ -6,13 +6,15 @@ package com.microsoft.signalr;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import io.reactivex.Single;
public class HttpHubConnectionBuilder {
private String url;
private Transport transport;
private Logger logger;
private HttpClient httpClient;
private boolean skipNegotiate;
private Supplier<CompletableFuture<String>> accessTokenProvider;
private Single<String> accessTokenProvider;
HttpHubConnectionBuilder(String url) {
this.url = url;
@ -39,7 +41,7 @@ public class HttpHubConnectionBuilder {
return this;
}
public HttpHubConnectionBuilder withAccessTokenProvider(Supplier<CompletableFuture<String>> accessTokenProvider) {
public HttpHubConnectionBuilder withAccessTokenProvider(Single<String> accessTokenProvider) {
this.accessTokenProvider = accessTokenProvider;
return this;
}

View File

@ -9,12 +9,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import io.reactivex.Completable;
import io.reactivex.Single;
public class HubConnection {
private static final String RECORD_SEPARATOR = "\u001e";
@ -32,13 +33,13 @@ public class HubConnection {
private Logger logger;
private List<Consumer<Exception>> onClosedCallbackList;
private boolean skipNegotiate;
private Supplier<CompletableFuture<String>> accessTokenProvider;
private Single<String> accessTokenProvider;
private Map<String, String> headers = new HashMap<>();
private ConnectionState connectionState = null;
private HttpClient httpClient;
private String stopError;
HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Supplier<CompletableFuture<String>> accessTokenProvider) {
HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single<String> accessTokenProvider) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
}
@ -49,7 +50,7 @@ public class HubConnection {
if (accessTokenProvider != null) {
this.accessTokenProvider = accessTokenProvider;
} else {
this.accessTokenProvider = () -> CompletableFuture.completedFuture(null);
this.accessTokenProvider = Single.just("");
}
if (httpClient != null) {
@ -153,14 +154,11 @@ public class HubConnection {
}
if (negotiateResponse.getAccessToken() != null) {
this.accessTokenProvider = () -> CompletableFuture.completedFuture(negotiateResponse.getAccessToken());
this.accessTokenProvider = Single.just(negotiateResponse.getAccessToken());
String token = "";
try {
// We know the future is already completed in this case
// It's fine to call get() on it.
token = this.accessTokenProvider.get().get();
} catch (InterruptedException | ExecutionException e) {
}
// We know the Single is non blocking in this case
// It's fine to call blockingGet() on it.
token = this.accessTokenProvider.blockingGet();
this.headers.put("Authorization", "Bearer " + token);
}
@ -179,20 +177,21 @@ public class HubConnection {
/**
* Starts a connection to the server.
* @return A completable future that completes when the connection has been established.
* @return A Completable that completes when the connection has been established.
*/
public CompletableFuture<Void> start() {
public Completable start() {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
return CompletableFuture.completedFuture(null);
return Completable.complete();
}
handshakeReceived = false;
CompletableFuture<Void> tokenFuture = accessTokenProvider.get()
.thenAccept((token) -> {
if (token != null) {
this.headers.put("Authorization", "Bearer " + token);
}
});
CompletableFuture<Void> tokenFuture = new CompletableFuture<>();
accessTokenProvider.subscribe(token -> {
if (token != null && !token.isEmpty()) {
this.headers.put("Authorization", "Bearer " + token);
}
tokenFuture.complete(null);
});
stopError = null;
CompletableFuture<String> negotiate = null;
@ -202,7 +201,7 @@ public class HubConnection {
negotiate = tokenFuture.thenCompose((v) -> CompletableFuture.completedFuture(baseUrl));
}
return negotiate.thenCompose((url) -> {
return Completable.fromFuture(negotiate.thenCompose(url -> {
logger.log(LogLevel.Debug, "Starting HubConnection.");
if (transport == null) {
transport = new WebSocketTransport(headers, httpClient, logger);
@ -211,27 +210,21 @@ public class HubConnection {
transport.setOnReceive(this.callback);
transport.setOnClose((message) -> stopConnection(message));
try {
return transport.start(url).thenCompose((future) -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenRun(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
} finally {
hubConnectionStateLock.unlock();
}
});
return transport.start(url).thenCompose((future) -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenRun(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
} finally {
hubConnectionStateLock.unlock();
}
});
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
});
}));
}
private CompletableFuture<String> startNegotiate(String url, int negotiateAttempts) {
@ -268,7 +261,7 @@ public class HubConnection {
/**
* Stops a connection to the server.
* @param errorMessage An error message if the connected needs to be stopped because of an error.
* @return A completable future that completes when the connection has been stopped.
* @return A Completable that completes when the connection has been stopped.
*/
private CompletableFuture<Void> stop(String errorMessage) {
hubConnectionStateLock.lock();
@ -292,10 +285,10 @@ public class HubConnection {
/**
* Stops a connection to the server.
* @return A completable future that completes when the connection has been stopped.
* @return A Completable that completes when the connection has been stopped.
*/
public CompletableFuture<Void> stop() {
return stop(null);
public Completable stop() {
return Completable.fromFuture(stop(null));
}
private void stopConnection(String errorMessage) {
@ -344,7 +337,7 @@ public class HubConnection {
sendHubMessage(invocationMessage);
}
public <T> CompletableFuture<T> invoke(Class<T> returnType, String method, Object... args) throws Exception {
public <T> Single<T> invoke(Class<T> returnType, String method, Object... args) throws Exception {
String id = connectionState.getNextInvocationId();
InvocationMessage invocationMessage = new InvocationMessage(id, method, args);
@ -372,7 +365,7 @@ public class HubConnection {
// where the map doesn't have the future yet when the response is returned
sendHubMessage(invocationMessage);
return future;
return Single.fromFuture(future);
}
private void sendHubMessage(HubMessage message) throws Exception {

View File

@ -11,10 +11,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import io.reactivex.Completable;
import io.reactivex.Single;
class HubConnectionTest {
private static final String RECORD_SEPARATOR = "\u001e";
@ -22,7 +25,7 @@ class HubConnectionTest {
@Test
public void checkHubConnectionState() throws Exception {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
@ -33,7 +36,7 @@ class HubConnectionTest {
public void transportCloseTriggersStopInHubConnection() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
mockTransport.stop();
@ -51,7 +54,7 @@ class HubConnectionTest {
message.set(error.getMessage());
});
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
mockTransport.stopWithError(errorMessage);
assertEquals(errorMessage, message.get());
@ -355,13 +358,15 @@ class HubConnectionTest {
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
result.doOnSuccess(value -> done.set(true));
assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(result.isDone());
assertFalse(done.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR);
assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS));
assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet());
}
@Test
@ -372,19 +377,23 @@ class HubConnectionTest {
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
CompletableFuture<String> result2 = hubConnection.invoke(String.class, "echo", "message");
AtomicBoolean doneFirst = new AtomicBoolean();
AtomicBoolean doneSecond = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
Single<String> result2 = hubConnection.invoke(String.class, "echo", "message");
result.doOnSuccess(value -> doneFirst.set(true));
result2.doOnSuccess(value -> doneSecond.set(true));
assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertEquals("{\"type\":1,\"invocationId\":\"2\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[2]);
assertFalse(result.isDone());
assertFalse(result2.isDone());
assertFalse(doneFirst.get());
assertFalse(doneSecond.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"2\",\"result\":\"message\"}" + RECORD_SEPARATOR);
assertEquals("message", result2.get(1000L, TimeUnit.MILLISECONDS));
assertFalse(result.isDone());
assertEquals("message", result2.timeout(1000, TimeUnit.MILLISECONDS).blockingGet());
assertFalse(doneFirst.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR);
assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS));
assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet());
}
@Test
@ -395,14 +404,16 @@ class HubConnectionTest {
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
// int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type
// which does not work for primitives we have to write special logic for that case.
CompletableFuture<Integer> result = hubConnection.invoke(int.class, "echo", "message");
assertFalse(result.isDone());
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
result.doOnSuccess(value -> done.set(true));
assertFalse(done.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR);
assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS));
assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet());
}
@Test
@ -413,17 +424,19 @@ class HubConnectionTest {
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(int.class, "echo", "message");
assertFalse(result.isDone());
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
result.doOnSuccess(value -> done.set(true));
assertFalse(done.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"error\":\"There was an error\"}" + RECORD_SEPARATOR);
String exceptionMessage = null;
try {
result.get(1000L, TimeUnit.MILLISECONDS);
result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet();
assertFalse(true);
} catch (Exception ex) {
exceptionMessage = ex.getMessage();
exceptionMessage = ex.getCause().getMessage();
}
assertEquals("com.microsoft.signalr.HubException: There was an error", exceptionMessage);
@ -437,14 +450,16 @@ class HubConnectionTest {
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(int.class, "echo", "message");
assertFalse(result.isDone());
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
result.doOnSuccess(value -> done.set(true));
assertFalse(done.get());
hubConnection.stop();
boolean hasException = false;
try {
result.get(1000L, TimeUnit.MILLISECONDS);
result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet();
assertFalse(true);
} catch (CancellationException ex) {
hasException = true;
@ -894,10 +909,10 @@ class HubConnectionTest {
assertTrue(false);
}, String.class);
CompletableFuture<Void> startFuture = hubConnection.start();
Completable startFuture = hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
startFuture.get(1000, TimeUnit.MILLISECONDS);
startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS);
RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR));
assertEquals("Invocation provides 0 argument(s) but target expects 1.", exception.getMessage());
}
@ -913,7 +928,7 @@ class HubConnectionTest {
.build();
try {
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
} catch(Exception ex) {}
List<HttpRequest> sentRequests = client.getSentRequests();
@ -931,8 +946,9 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS));
assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getMessage());
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getCause().getMessage());
}
@Test
@ -949,7 +965,7 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
String[] sentMessages = transport.getSentMessages();
assertEquals(1, sentMessages.length);
@ -968,8 +984,9 @@ class HubConnectionTest {
.withTransport(transport)
.build();
ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS));
assertEquals("Test error.", exception.getCause().getMessage());
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals("Test error.", exception.getCause().getCause().getMessage());
}
@Test
@ -987,7 +1004,7 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
}
@ -1010,10 +1027,10 @@ class HubConnectionTest {
.create("http://example.com")
.withTransport(transport)
.withHttpClient(client)
.withAccessTokenProvider(() -> CompletableFuture.completedFuture("secretToken"))
.withAccessTokenProvider(Single.just("secretToken"))
.build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
assertEquals("Bearer secretToken", token.get());
@ -1036,10 +1053,10 @@ class HubConnectionTest {
.create("http://example.com")
.withTransport(transport)
.withHttpClient(client)
.withAccessTokenProvider(() -> CompletableFuture.completedFuture("secretToken"))
.withAccessTokenProvider(Single.just("secretToken"))
.build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
assertEquals("http://testexample.com/?id=bVOiRPG8-6YiJ6d7ZcTOVQ", transport.getUrl());
hubConnection.stop();
@ -1055,13 +1072,13 @@ class HubConnectionTest {
.shouldSkipNegotiate(true)
.build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().get(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().get(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -1081,13 +1098,13 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().get(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().get(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -1105,7 +1122,8 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
ExecutionException exception = assertThrows(ExecutionException.class, () -> hubConnection.start().get(1000, TimeUnit.MILLISECONDS));
assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getCause().getMessage());
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getCause().getCause().getMessage());
}
}

View File

@ -32,7 +32,7 @@ public class Chat {
});
//This is a blocking call
hubConnection.start().get();
hubConnection.start().blockingAwait();
String message = "";
while (!message.equals("leave")) {