[Java] Plumb RxJava through client (#3148)

This commit is contained in:
BrennanConroy 2018-10-23 10:52:26 -07:00 committed by GitHub
parent 0f5fc5b912
commit f7d10bec02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 242 additions and 217 deletions

View File

@ -7,10 +7,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.reactivex.Single;
import io.reactivex.subjects.SingleSubject;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Cookie;
@ -78,7 +79,7 @@ final class DefaultHttpClient extends HttpClient {
}
@Override
public CompletableFuture<HttpResponse> send(HttpRequest httpRequest) {
public Single<HttpResponse> send(HttpRequest httpRequest) {
Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl());
switch (httpRequest.getMethod()) {
@ -100,24 +101,24 @@ final class DefaultHttpClient extends HttpClient {
Request request = requestBuilder.build();
CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
SingleSubject<HttpResponse> responseSubject = SingleSubject.create();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
responseFuture.completeExceptionally(e.getCause());
responseSubject.onError(e.getCause());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
try (ResponseBody body = response.body()) {
HttpResponse httpResponse = new HttpResponse(response.code(), response.message(), body.string());
responseFuture.complete(httpResponse);
responseSubject.onSuccess(httpResponse);
}
}
});
return responseFuture;
return responseSubject;
}
@Override

View File

@ -5,7 +5,8 @@ package com.microsoft.signalr;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import io.reactivex.Single;
class HttpRequest {
private String method;
@ -74,46 +75,46 @@ class HttpResponse {
}
abstract class HttpClient {
public CompletableFuture<HttpResponse> get(String url) {
public Single<HttpResponse> get(String url) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("GET");
return this.send(request);
}
public CompletableFuture<HttpResponse> get(String url, HttpRequest options) {
public Single<HttpResponse> get(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("GET");
return this.send(options);
}
public CompletableFuture<HttpResponse> post(String url) {
public Single<HttpResponse> post(String url) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("POST");
return this.send(request);
}
public CompletableFuture<HttpResponse> post(String url, HttpRequest options) {
public Single<HttpResponse> post(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("POST");
return this.send(options);
}
public CompletableFuture<HttpResponse> delete(String url) {
public Single<HttpResponse> delete(String url) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("DELETE");
return this.send(request);
}
public CompletableFuture<HttpResponse> delete(String url, HttpRequest options) {
public Single<HttpResponse> delete(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("DELETE");
return this.send(options);
}
public abstract CompletableFuture<HttpResponse> send(HttpRequest request);
public abstract Single<HttpResponse> send(HttpRequest request);
public abstract WebSocketWrapper createWebSocket(String url, Map<String, String> headers);
}

View File

@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.subjects.CompletableSubject;
import io.reactivex.subjects.SingleSubject;
public class HubConnection {
private static final String RECORD_SEPARATOR = "\u001e";
@ -49,7 +51,7 @@ public class HubConnection {
private long keepAliveInterval = 15*1000;
private long serverTimeout = 30*1000;
private long tickRate = 1000;
private CompletableFuture<Void> handshakeResponseFuture;
private CompletableSubject handshakeResponseSubject;
private long handshakeResponseTimeout = 15*1000;
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
@ -140,18 +142,18 @@ public class HubConnection {
handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
} catch (RuntimeException ex) {
RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex);
handshakeResponseFuture.completeExceptionally(exception);
handshakeResponseSubject.onError(exception);
throw exception;
}
if (handshakeResponse.getHandshakeError() != null) {
String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError();
logger.error(errorMessage);
RuntimeException exception = new RuntimeException(errorMessage);
handshakeResponseFuture.completeExceptionally(exception);
handshakeResponseSubject.onError(exception);
throw exception;
}
handshakeReceived = true;
handshakeResponseFuture.complete(null);
handshakeResponseSubject.onComplete();
payload = payload.substring(handshakeLength);
// The payload only contained the handshake response so we can return.
@ -206,15 +208,21 @@ public class HubConnection {
private void timeoutHandshakeResponse(long timeout, TimeUnit unit) {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.schedule(() -> handshakeResponseFuture.completeExceptionally(
new TimeoutException("Timed out waiting for the server to respond to the handshake message.")), timeout, unit);
scheduledThreadPool.schedule(() -> {
// If onError is called on a completed subject the global error handler is called
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable()))
{
handshakeResponseSubject.onError(
new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
}
}, timeout, unit);
}
private CompletableFuture<NegotiateResponse> handleNegotiate(String url) {
private Single<NegotiateResponse> handleNegotiate(String url) {
HttpRequest request = new HttpRequest();
request.addHeaders(this.headers);
return httpClient.post(Negotiate.resolveNegotiateUrl(url), request).thenCompose((response) -> {
return httpClient.post(Negotiate.resolveNegotiateUrl(url), request).map((response) -> {
if (response.getStatusCode() != 200) {
throw new RuntimeException(String.format("Unexpected status code returned from negotiate: %d %s.", response.getStatusCode(), response.getStatusText()));
}
@ -233,7 +241,7 @@ public class HubConnection {
this.headers.put("Authorization", "Bearer " + token);
}
return CompletableFuture.completedFuture(negotiateResponse);
return negotiateResponse;
});
}
@ -256,25 +264,27 @@ public class HubConnection {
return Completable.complete();
}
handshakeResponseFuture = new CompletableFuture<>();
handshakeResponseSubject = CompletableSubject.create();
handshakeReceived = false;
CompletableFuture<Void> tokenFuture = new CompletableFuture<>();
CompletableSubject tokenCompletable = CompletableSubject.create();
accessTokenProvider.subscribe(token -> {
if (token != null && !token.isEmpty()) {
this.headers.put("Authorization", "Bearer " + token);
}
tokenFuture.complete(null);
tokenCompletable.onComplete();
});
stopError = null;
CompletableFuture<String> negotiate = null;
Single<String> negotiate = null;
if (!skipNegotiate) {
negotiate = tokenFuture.thenCompose((v) -> startNegotiate(baseUrl, 0));
negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0)));
} else {
negotiate = tokenFuture.thenCompose((v) -> CompletableFuture.completedFuture(baseUrl));
negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(baseUrl)));
}
return Completable.fromFuture(negotiate.thenCompose(url -> {
CompletableSubject start = CompletableSubject.create();
negotiate.flatMapCompletable(url -> {
logger.debug("Starting HubConnection.");
if (transport == null) {
transport = new WebSocketTransport(headers, httpClient);
@ -283,16 +293,17 @@ public class HubConnection {
transport.setOnReceive(this.callback);
transport.setOnClose((message) -> stopConnection(message));
return transport.start(url).thenCompose((future) -> {
return transport.start(url).andThen(Completable.defer(() -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenCompose((innerFuture) -> {
return transport.send(handshake).andThen(Completable.defer(() -> {
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
return handshakeResponseFuture.thenRun(() -> {
return handshakeResponseSubject.andThen(Completable.defer(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
hubConnectionState = HubConnectionState.CONNECTED;
logger.info("HubConnection started.");
resetServerTimeout();
@ -320,18 +331,23 @@ public class HubConnection {
} finally {
hubConnectionStateLock.unlock();
}
});
});
});
}));
return Completable.complete();
}));
}));
}));
// subscribe makes this a "hot" completable so this runs immediately
}).subscribeWith(start);
return start;
}
private CompletableFuture<String> startNegotiate(String url, int negotiateAttempts) {
private Single<String> startNegotiate(String url, int negotiateAttempts) {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
return CompletableFuture.completedFuture(null);
return Single.just(null);
}
return handleNegotiate(url).thenCompose((response) -> {
return handleNegotiate(url).flatMap(response -> {
if (response.getRedirectUrl() != null && negotiateAttempts >= MAX_NEGOTIATE_ATTEMPTS) {
throw new RuntimeException("Negotiate redirection limit exceeded.");
}
@ -350,7 +366,7 @@ public class HubConnection {
}
}
return CompletableFuture.completedFuture(finalUrl);
return Single.just(finalUrl);
}
return startNegotiate(response.getRedirectUrl(), negotiateAttempts + 1);
@ -363,11 +379,11 @@ public class HubConnection {
* @param errorMessage An error message if the connected needs to be stopped because of an error.
* @return A Completable that completes when the connection has been stopped.
*/
private CompletableFuture<Void> stop(String errorMessage) {
private Completable stop(String errorMessage) {
hubConnectionStateLock.lock();
try {
if (hubConnectionState == HubConnectionState.DISCONNECTED) {
return CompletableFuture.completedFuture(null);
return Completable.complete();
}
if (errorMessage != null) {
@ -389,7 +405,7 @@ public class HubConnection {
* @return A Completable that completes when the connection has been stopped.
*/
public Completable stop() {
return Completable.fromFuture(stop(null));
return stop(null);
}
private void stopConnection(String errorMessage) {
@ -409,7 +425,7 @@ public class HubConnection {
connectionState = null;
logger.info("HubConnection stopped.");
hubConnectionState = HubConnectionState.DISCONNECTED;
handshakeResponseFuture.complete(null);
handshakeResponseSubject.onComplete();
} finally {
hubConnectionStateLock.unlock();
}
@ -452,31 +468,27 @@ public class HubConnection {
String id = connectionState.getNextInvocationId();
InvocationMessage invocationMessage = new InvocationMessage(id, method, args);
CompletableFuture<T> future = new CompletableFuture<>();
SingleSubject<T> subject = SingleSubject.create();
InvocationRequest irq = new InvocationRequest(returnType, id);
connectionState.addInvocation(irq);
// forward the invocation result or error to the user
// run continuations on a separate thread
CompletableFuture<Object> pendingCall = irq.getPendingCall();
pendingCall.whenCompleteAsync((result, error) -> {
if (error == null) {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
future.complete((T)result);
} else {
future.complete(returnType.cast(result));
}
Single<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
subject.onSuccess((T)result);
} else {
future.completeExceptionally(error);
subject.onSuccess(returnType.cast(result));
}
});
}, error -> subject.onError(error));
// Make sure the actual send is after setting up the future otherwise there is a race
// where the map doesn't have the future yet when the response is returned
// Make sure the actual send is after setting up the callbacks otherwise there is a race
// where the map doesn't have the callbacks yet when the response is returned
sendHubMessage(invocationMessage);
return Single.fromFuture(future);
return subject;
}
private void sendHubMessage(HubMessage message) {

View File

@ -3,11 +3,14 @@
package com.microsoft.signalr;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CancellationException;
import io.reactivex.Single;
import io.reactivex.subjects.SingleSubject;
class InvocationRequest {
private final Class<?> returnType;
private final CompletableFuture<Object> pendingCall = new CompletableFuture<>();
private final SingleSubject<Object> pendingCall = SingleSubject.create();
private final String invocationId;
InvocationRequest(Class<?> returnType, String invocationId) {
@ -17,21 +20,21 @@ class InvocationRequest {
public void complete(CompletionMessage completion) {
if (completion.getResult() != null) {
pendingCall.complete(completion.getResult());
pendingCall.onSuccess(completion.getResult());
} else {
pendingCall.completeExceptionally(new HubException(completion.getError()));
pendingCall.onError(new HubException(completion.getError()));
}
}
public void fail(Exception ex) {
pendingCall.completeExceptionally(ex);
pendingCall.onError(ex);
}
public void cancel() {
pendingCall.cancel(false);
pendingCall.onError(new CancellationException("Invocation was canceled."));
}
public CompletableFuture<Object> getPendingCall() {
public Single<Object> getPendingCall() {
return pendingCall;
}

View File

@ -4,12 +4,13 @@
package com.microsoft.signalr;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.reactivex.Completable;
import io.reactivex.subjects.CompletableSubject;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@ -24,8 +25,8 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
private OkHttpClient client;
private OnReceiveCallBack onReceive;
private BiConsumer<Integer, String> onClose;
private CompletableFuture<Void> startFuture = new CompletableFuture<>();
private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private CompletableSubject startSubject = CompletableSubject.create();
private CompletableSubject closeSubject = CompletableSubject.create();
private final Logger logger = LoggerFactory.getLogger(OkHttpWebSocketWrapper.class);
@ -36,7 +37,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
}
@Override
public CompletableFuture<Void> start() {
public Completable start() {
Headers.Builder headerBuilder = new Headers.Builder();
for (String key : headers.keySet()) {
headerBuilder.add(key, headers.get(key));
@ -48,19 +49,19 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
.build();
this.websocketClient = client.newWebSocket(request, new SignalRWebSocketListener());
return startFuture;
return startSubject;
}
@Override
public CompletableFuture<Void> stop() {
public Completable stop() {
websocketClient.close(1000, "HubConnection stopped.");
return closeFuture;
return closeSubject;
}
@Override
public CompletableFuture<Void> send(String message) {
public Completable send(String message) {
websocketClient.send(message);
return CompletableFuture.completedFuture(null);
return Completable.complete();
}
@Override
@ -76,7 +77,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
private class SignalRWebSocketListener extends WebSocketListener {
@Override
public void onOpen(WebSocket webSocket, Response response) {
startFuture.complete(null);
startSubject.onComplete();
}
@Override
@ -92,25 +93,25 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
onClose.accept(code, reason);
closeFuture.complete(null);
closeSubject.onComplete();
checkStartFailure();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.error("Websocket closed from an error: {}.", t.getMessage());
closeFuture.completeExceptionally(new RuntimeException(t));
closeSubject.onError(new RuntimeException(t));
onClose.accept(null, t.getMessage());
checkStartFailure();
}
private void checkStartFailure() {
// If the start future hasn't completed yet, then we need to complete it
// If the start task hasn't completed yet, then we need to complete it
// exceptionally.
if (!startFuture.isDone()) {
if (!startSubject.hasComplete()) {
String errorMessage = "There was an error starting the Websockets transport.";
logger.error("Websocket closed from an error: {}.", errorMessage);
startFuture.completeExceptionally(new RuntimeException(errorMessage));
startSubject.onError(new RuntimeException(errorMessage));
}
}
}

View File

@ -3,14 +3,15 @@
package com.microsoft.signalr;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import io.reactivex.Completable;
interface Transport {
CompletableFuture<Void> start(String url);
CompletableFuture<Void> send(String message);
Completable start(String url);
Completable send(String message);
void setOnReceive(OnReceiveCallBack callback);
void onReceive(String message);
void setOnClose(Consumer<String> onCloseCallback);
CompletableFuture<Void> stop();
Completable stop();
}

View File

@ -4,12 +4,13 @@
package com.microsoft.signalr;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.reactivex.Completable;
class WebSocketTransport implements Transport {
private WebSocketWrapper webSocketClient;
private OnReceiveCallBack onReceiveCallBack;
@ -46,7 +47,7 @@ class WebSocketTransport implements Transport {
}
@Override
public CompletableFuture<Void> start(String url) {
public Completable start(String url) {
this.url = formatUrl(url);
logger.debug("Starting Websocket connection.");
this.webSocketClient = client.createWebSocket(this.url, this.headers);
@ -57,11 +58,11 @@ class WebSocketTransport implements Transport {
}
});
return webSocketClient.start().thenRun(() -> logger.info("WebSocket transport connected to: {}.", this.url));
return webSocketClient.start().doOnComplete(() -> logger.info("WebSocket transport connected to: {}.", this.url));
}
@Override
public CompletableFuture<Void> send(String message) {
public Completable send(String message) {
return webSocketClient.send(message);
}
@ -82,8 +83,8 @@ class WebSocketTransport implements Transport {
}
@Override
public CompletableFuture<Void> stop() {
return webSocketClient.stop().whenComplete((i, j) -> logger.info("WebSocket connection stopped."));
public Completable stop() {
return webSocketClient.stop().doOnEvent(t -> logger.info("WebSocket connection stopped."));
}
void onClose(int code, String reason) {

View File

@ -3,15 +3,16 @@
package com.microsoft.signalr;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import io.reactivex.Completable;
abstract class WebSocketWrapper {
public abstract CompletableFuture<Void> start();
public abstract Completable start();
public abstract CompletableFuture<Void> stop();
public abstract Completable stop();
public abstract CompletableFuture<Void> send(String message);
public abstract Completable send(String message);
public abstract void setOnReceive(OnReceiveCallBack onReceive);

View File

@ -7,7 +7,6 @@ import static org.junit.jupiter.api.Assertions.*;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -16,8 +15,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.subjects.SingleSubject;
class HubConnectionTest {
private static final String RECORD_SEPARATOR = "\u001e";
@ -25,7 +24,7 @@ class HubConnectionTest {
@Test
public void checkHubConnectionState() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
@ -36,7 +35,7 @@ class HubConnectionTest {
public void transportCloseTriggersStopInHubConnection() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
mockTransport.stop();
@ -54,7 +53,7 @@ class HubConnectionTest {
message.set(error.getMessage());
});
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
mockTransport.stopWithError(errorMessage);
assertEquals(errorMessage, message.get());
@ -69,10 +68,9 @@ class HubConnectionTest {
.shouldSkipNegotiate(true)
.withHandshakeResponseTimeout(100)
.build();
Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals(ExecutionException.class, exception.getCause().getClass());
assertEquals(TimeoutException.class, exception.getCause().getCause().getClass());
assertEquals(exception.getCause().getCause().getMessage(), "Timed out waiting for the server to respond to the handshake message.");
Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait());
assertEquals(TimeoutException.class, exception.getCause().getClass());
assertEquals("Timed out waiting for the server to respond to the handshake message.", exception.getCause().getMessage());
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -81,7 +79,7 @@ class HubConnectionTest {
Transport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
@ -93,7 +91,7 @@ class HubConnectionTest {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -108,6 +106,7 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait();
Throwable exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{" + RECORD_SEPARATOR));
assertEquals("An invalid handshake response was received from the server.", exception.getMessage());
@ -120,6 +119,7 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait();
Throwable exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"error\":\"Requested protocol 'messagepack' is not available.\"}" + RECORD_SEPARATOR));
assertEquals("Error in handshake Requested protocol 'messagepack' is not available.", exception.getMessage());
}
@ -136,7 +136,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -160,7 +160,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -187,7 +187,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -212,7 +212,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -241,7 +241,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -273,7 +273,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -308,7 +308,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String message = mockTransport.getSentMessages()[0];
String expectedHanshakeRequest = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
@ -336,7 +336,7 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
try {
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
@ -360,7 +360,7 @@ class HubConnectionTest {
hubConnection.on("add", action, Double.class);
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -372,7 +372,7 @@ class HubConnectionTest {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
@ -390,7 +390,7 @@ class HubConnectionTest {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean doneFirst = new AtomicBoolean();
AtomicBoolean doneSecond = new AtomicBoolean();
@ -416,7 +416,7 @@ class HubConnectionTest {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean done = new AtomicBoolean();
// int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type
@ -435,7 +435,7 @@ class HubConnectionTest {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
@ -448,11 +448,11 @@ class HubConnectionTest {
try {
result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet();
assertFalse(true);
} catch (Exception ex) {
exceptionMessage = ex.getCause().getMessage();
} catch (HubException ex) {
exceptionMessage = ex.getMessage();
}
assertEquals("com.microsoft.signalr.HubException: There was an error", exceptionMessage);
assertEquals("There was an error", exceptionMessage);
}
@Test
@ -460,7 +460,7 @@ class HubConnectionTest {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
@ -469,15 +469,15 @@ class HubConnectionTest {
hubConnection.stop();
boolean hasException = false;
RuntimeException hasException = null;
try {
result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet();
assertFalse(true);
} catch (CancellationException ex) {
hasException = true;
hasException = ex;
}
assertTrue(hasException);
assertEquals("Invocation was canceled.", hasException.getMessage());
}
@Test
@ -491,7 +491,7 @@ class HubConnectionTest {
value.getAndUpdate((val) -> val + 1);
});
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -509,7 +509,7 @@ class HubConnectionTest {
value.set(param);
}, String.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "Hello World");
@ -533,7 +533,7 @@ class HubConnectionTest {
value2.set(param2);
}, String.class, Double.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\", 12]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "Hello World", 12);
@ -561,7 +561,7 @@ class HubConnectionTest {
value3.set(param3);
}, String.class, String.class, String.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\"]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "A", "B", "C");
@ -593,7 +593,7 @@ class HubConnectionTest {
value4.set(param4);
}, String.class, String.class, String.class, String.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\", \"D\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -628,7 +628,7 @@ class HubConnectionTest {
value5.set(param5);
}, String.class, String.class, String.class, Boolean.class, Double.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12 ]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -667,7 +667,7 @@ class HubConnectionTest {
value6.set(param6);
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -710,7 +710,7 @@ class HubConnectionTest {
value7.set(param7);
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -757,7 +757,7 @@ class HubConnectionTest {
value8.set(param8);
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class, String.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\",\"F\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
assertEquals("A", value1.get());
@ -789,7 +789,7 @@ class HubConnectionTest {
value1.set(param1);
}, Custom.class);
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[{\"number\":1,\"str\":\"A\",\"bools\":[true,false]}]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -802,18 +802,19 @@ class HubConnectionTest {
}
@Test
public void receiveHandshakeResponseAndMessage() {
public void receiveHandshakeResponseAndMessage() {
AtomicReference<Double> value = new AtomicReference<Double>(0.0);
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.on("inc", () ->{
hubConnection.on("inc", () -> {
assertEquals(Double.valueOf(0), value.get());
value.getAndUpdate((val) -> val + 1);
});
// On start we're going to receive the handshake response and also an invocation in the same payload.
hubConnection.start();
mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait();
String expectedSentMessage = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
assertEquals(expectedSentMessage, mockTransport.getSentMessages()[0]);
@ -827,7 +828,7 @@ class HubConnectionTest {
public void onClosedCallbackRunsWhenStopIsCalled() {
AtomicReference<String> value1 = new AtomicReference<>();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
hubConnection.onClosed((ex) -> {
assertNull(value1.get());
value1.set("Closed callback ran.");
@ -843,7 +844,7 @@ class HubConnectionTest {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
hubConnection.onClosed((ex) -> {
assertNull(value1.get());
@ -871,7 +872,7 @@ class HubConnectionTest {
hubConnection.onClosed((ex) -> {
assertEquals(ex.getMessage(), "There was an error");
});
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -883,10 +884,10 @@ class HubConnectionTest {
@Test
public void callingStartOnStartedHubConnectionNoOps() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
@ -910,9 +911,8 @@ class HubConnectionTest {
assertTrue(false);
}, String.class);
Completable startFuture = hubConnection.start();
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS);
RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR));
assertEquals("Invocation provides 0 argument(s) but target expects 1.", exception.getMessage());
}
@ -920,16 +920,15 @@ class HubConnectionTest {
@Test
public void negotiateSentOnStart() {
TestHttpClient client = new TestHttpClient()
.on("POST", (req) -> CompletableFuture.completedFuture(new HttpResponse(404, "", "")));
.on("POST", (req) -> Single.just(new HttpResponse(404, "", "")));
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withHttpClient(client)
.build();
try {
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
} catch(Exception ex) {}
Exception exception = assertThrows(RuntimeException.class, () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait());
assertEquals("Unexpected status code returned from negotiate: 404 .", exception.getMessage());
List<HttpRequest> sentRequests = client.getSentRequests();
assertEquals(1, sentRequests.size());
@ -939,7 +938,7 @@ class HubConnectionTest {
@Test
public void negotiateThatRedirectsForeverFailsAfter100Tries() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}")));
(req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://example.com\"}")));
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
@ -947,14 +946,14 @@ class HubConnectionTest {
.build();
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals("Negotiate redirection limit exceeded.", exception.getCause().getCause().getMessage());
() -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait());
assertEquals("Negotiate redirection limit exceeded.", exception.getMessage());
}
@Test
public void afterSuccessfulNegotiateConnectsWithTransport() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "",
(req) -> Single.just(new HttpResponse(200, "",
"{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
@ -965,7 +964,7 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
String[] sentMessages = transport.getSentMessages();
assertEquals(1, sentMessages.length);
@ -975,7 +974,7 @@ class HubConnectionTest {
@Test
public void negotiateThatReturnsErrorThrowsFromStart() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"error\":\"Test error.\"}")));
(req) -> Single.just(new HttpResponse(200, "", "{\"error\":\"Test error.\"}")));
MockTransport transport = new MockTransport(true);
HubConnection hubConnection = HubConnectionBuilder
@ -985,16 +984,16 @@ class HubConnectionTest {
.build();
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals("Test error.", exception.getCause().getCause().getMessage());
() -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait());
assertEquals("Test error.", exception.getMessage());
}
@Test
public void negotiateRedirectIsFollowed() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
(req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
.on("POST", "http://testexample.com/negotiate",
(req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
(req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
MockTransport transport = new MockTransport(true);
@ -1004,7 +1003,7 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
}
@ -1016,8 +1015,7 @@ class HubConnectionTest {
.on("POST", "http://example.com/negotiate",
(req) -> {
token.set(req.getHeaders().get("Authorization"));
return CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"));
});
@ -1029,7 +1027,7 @@ class HubConnectionTest {
.withAccessTokenProvider(Single.just("secretToken"))
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
assertEquals("Bearer secretToken", token.get());
@ -1039,11 +1037,10 @@ class HubConnectionTest {
public void accessTokenProviderIsOverriddenFromRedirectNegotiate() {
AtomicReference<String> token = new AtomicReference<>();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate", (req) -> CompletableFuture.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"newToken\"}")))
.on("POST", "http://example.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"newToken\"}")))
.on("POST", "http://testexample.com/negotiate", (req) -> {
token.set(req.getHeaders().get("Authorization"));
return CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"));
});
@ -1055,7 +1052,7 @@ class HubConnectionTest {
.withAccessTokenProvider(Single.just("secretToken"))
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
assertEquals("http://testexample.com/?id=bVOiRPG8-6YiJ6d7ZcTOVQ", transport.getUrl());
hubConnection.stop();
@ -1067,14 +1064,14 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.setServerTimeout(1);
hubConnection.setTickRate(1);
CompletableFuture<Exception> closedFuture = new CompletableFuture<>();
SingleSubject<Exception> closedSubject = SingleSubject.create();
hubConnection.onClosed((e) -> {
closedFuture.complete(e);
closedSubject.onSuccess(e);
});
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals("Server timeout elapsed without receiving a message from the server.", closedFuture.get(1000, TimeUnit.MILLISECONDS).getMessage());
assertEquals("Server timeout elapsed without receiving a message from the server.", closedSubject.timeout(1, TimeUnit.SECONDS).blockingGet().getMessage());
}
@Test
@ -1084,7 +1081,7 @@ class HubConnectionTest {
hubConnection.setKeepAliveInterval(1);
hubConnection.setTickRate(1);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
TimeUnit.MILLISECONDS.sleep(100);
hubConnection.stop();
@ -1103,8 +1100,7 @@ class HubConnectionTest {
.on("POST", "http://example.com/negotiate",
(req) -> {
header.set(req.getHeaders().get("ExampleHeader"));
return CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"));
});
@ -1116,7 +1112,7 @@ class HubConnectionTest {
.withHeader("ExampleHeader", "ExampleValue")
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
assertEquals("ExampleValue", header.get());
@ -1129,8 +1125,7 @@ class HubConnectionTest {
.on("POST", "http://example.com/negotiate",
(req) -> {
header.set(req.getHeaders().get("ExampleHeader"));
return CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
return Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"));
});
@ -1143,7 +1138,7 @@ class HubConnectionTest {
.withHeader("ExampleHeader", "New Value")
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
assertEquals("New Value", header.get());
@ -1158,13 +1153,13 @@ class HubConnectionTest {
.shouldSkipNegotiate(true)
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -1172,10 +1167,8 @@ class HubConnectionTest {
public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() {
MockTransport mockTransport = new MockTransport();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate", (req) -> CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
.on("POST", "http://testexample.com/negotiate", (req) -> CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
.on("POST", "http://example.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
.on("POST", "http://testexample.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
HubConnection hubConnection = HubConnectionBuilder
@ -1184,13 +1177,13 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().blockingAwait(1000, TimeUnit.MILLISECONDS);
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -1198,8 +1191,7 @@ class HubConnectionTest {
public void non200FromNegotiateThrowsError() {
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate",
(req) -> CompletableFuture
.completedFuture(new HttpResponse(500, "Internal server error", "")));
(req) -> Single.just(new HttpResponse(500, "Internal server error", "")));
MockTransport transport = new MockTransport();
HubConnection hubConnection = HubConnectionBuilder
@ -1209,7 +1201,7 @@ class HubConnectionTest {
.build();
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS));
assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getCause().getCause().getMessage());
() -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait());
assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getMessage());
}
}

View File

@ -4,9 +4,11 @@
package com.microsoft.signalr;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import io.reactivex.Completable;
import io.reactivex.subjects.CompletableSubject;
class MockTransport implements Transport {
private OnReceiveCallBack onReceiveCallBack;
private ArrayList<String> sentMessages = new ArrayList<>();
@ -14,6 +16,8 @@ class MockTransport implements Transport {
private Consumer<String> onClose;
final private boolean ignorePings;
final private boolean autoHandshake;
final private CompletableSubject startSubject = CompletableSubject.create();
final private CompletableSubject stopSubject = CompletableSubject.create();
private static final String RECORD_SEPARATOR = "\u001e";
@ -31,7 +35,7 @@ class MockTransport implements Transport {
}
@Override
public CompletableFuture<Void> start(String url) {
public Completable start(String url) {
this.url = url;
if (autoHandshake) {
try {
@ -40,15 +44,16 @@ class MockTransport implements Transport {
throw new RuntimeException(e);
}
}
return CompletableFuture.completedFuture(null);
startSubject.onComplete();
return startSubject;
}
@Override
public CompletableFuture<Void> send(String message) {
public Completable send(String message) {
if (!(ignorePings && message.equals("{\"type\":6}" + RECORD_SEPARATOR))) {
sentMessages.add(message);
}
return CompletableFuture.completedFuture(null);
return Completable.complete();
}
@Override
@ -67,9 +72,10 @@ class MockTransport implements Transport {
}
@Override
public CompletableFuture<Void> stop() {
public Completable stop() {
onClose.accept(null);
return CompletableFuture.completedFuture(null);
stopSubject.onComplete();
return stopSubject;
}
public void stopWithError(String errorMessage) {
@ -87,4 +93,12 @@ class MockTransport implements Transport {
public String getUrl() {
return this.url;
}
public Completable getStartTask() {
return startSubject;
}
public Completable getStopTask() {
return stopSubject;
}
}

View File

@ -6,24 +6,23 @@ package com.microsoft.signalr;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import io.reactivex.Single;
class TestHttpClient extends HttpClient {
private Function<HttpRequest, CompletableFuture<HttpResponse>> handler;
private Function<HttpRequest, Single<HttpResponse>> handler;
private List<HttpRequest> sentRequests;
public TestHttpClient() {
this.sentRequests = new ArrayList<>();
this.handler = (req) -> {
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException(String.format("Request has no handler: %s %s", req.getMethod(), req.getUrl())));
return future;
return Single.error(new RuntimeException(String.format("Request has no handler: %s %s", req.getMethod(), req.getUrl())));
};
}
@Override
public CompletableFuture<HttpResponse> send(HttpRequest request) {
public Single<HttpResponse> send(HttpRequest request) {
this.sentRequests.add(request);
return this.handler.apply(request);
}
@ -32,13 +31,13 @@ class TestHttpClient extends HttpClient {
return sentRequests;
}
public TestHttpClient on(Function<HttpRequest, CompletableFuture<HttpResponse>> handler) {
public TestHttpClient on(Function<HttpRequest, Single<HttpResponse>> handler) {
this.handler = (req) -> handler.apply(req);
return this;
}
public TestHttpClient on(String method, Function<HttpRequest, CompletableFuture<HttpResponse>> handler) {
Function<HttpRequest, CompletableFuture<HttpResponse>> oldHandler = this.handler;
public TestHttpClient on(String method, Function<HttpRequest, Single<HttpResponse>> handler) {
Function<HttpRequest, Single<HttpResponse>> oldHandler = this.handler;
this.handler = (req) -> {
if (req.getMethod().equals(method)) {
return handler.apply(req);
@ -50,8 +49,8 @@ class TestHttpClient extends HttpClient {
return this;
}
public TestHttpClient on(String method, String url, Function<HttpRequest, CompletableFuture<HttpResponse>> handler) {
Function<HttpRequest, CompletableFuture<HttpResponse>> oldHandler = this.handler;
public TestHttpClient on(String method, String url, Function<HttpRequest, Single<HttpResponse>> handler) {
Function<HttpRequest, Single<HttpResponse>> oldHandler = this.handler;
this.handler = (req) -> {
if (req.getMethod().equals(method) && req.getUrl().equals(url)) {
return handler.apply(req);

View File

@ -6,7 +6,6 @@ package com.microsoft.signalr;
import static org.junit.jupiter.api.Assertions.*;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
@ -15,7 +14,7 @@ class WebSocketTransportTest {
@Test
public void WebsocketThrowsIfItCantConnect() {
Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient());
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start("http://www.example.com").get(1, TimeUnit.SECONDS));
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());
RuntimeException exception = assertThrows(RuntimeException.class, () -> transport.start("http://www.example.com").blockingAwait(1, TimeUnit.SECONDS));
assertEquals("There was an error starting the Websockets transport.", exception.getMessage());
}
}