[Java] Pass Url to transport start so we can reuse the transport (#3051)

This commit is contained in:
BrennanConroy 2018-10-03 19:10:29 -07:00 committed by GitHub
parent 420c1d507d
commit 2ccf79a912
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 37 additions and 14 deletions

View File

@ -186,6 +186,7 @@ public class HubConnection {
return CompletableFuture.completedFuture(null);
}
handshakeReceived = false;
CompletableFuture<Void> tokenFuture = accessTokenProvider.get()
.thenAccept((token) -> {
if (token != null) {
@ -203,13 +204,13 @@ public class HubConnection {
return negotiate.thenCompose((url) -> {
logger.log(LogLevel.Debug, "Starting HubConnection.");
if (transport == null) {
transport = new WebSocketTransport(url, headers, httpClient, logger);
transport = new WebSocketTransport(headers, httpClient, logger);
}
transport.setOnReceive(this.callback);
try {
return transport.start().thenCompose((future) -> {
return transport.start(url).thenCompose((future) -> {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).thenRun(() -> {
@ -289,8 +290,6 @@ public class HubConnection {
HubException hubException = null;
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.DISCONNECTED;
if (errorMessage != null) {
hubException = new HubException(errorMessage);
} else if (t != null) {
@ -299,6 +298,7 @@ public class HubConnection {
connectionState.cancelOutstandingInvocations(hubException);
connectionState = null;
logger.log(LogLevel.Information, "HubConnection stopped.");
hubConnectionState = HubConnectionState.DISCONNECTED;
} finally {
hubConnectionStateLock.unlock();
}

View File

@ -37,6 +37,10 @@ class JsonHubProtocol implements HubProtocol {
@Override
public HubMessage[] parseMessages(String payload, InvocationBinder binder) throws Exception {
if (payload != null && !payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR)) {
throw new RuntimeException("Message is incomplete.");
}
String[] messages = payload.split(RECORD_SEPARATOR);
List<HubMessage> hubMessages = new ArrayList<>();
for (String str : messages) {
@ -48,7 +52,6 @@ class JsonHubProtocol implements HubProtocol {
JsonArray argumentsToken = null;
Object result = null;
JsonElement resultToken = null;
JsonReader reader = new JsonReader(new StringReader(str));
reader.beginObject();

View File

@ -6,7 +6,7 @@ package com.microsoft.aspnet.signalr;
import java.util.concurrent.CompletableFuture;
interface Transport {
CompletableFuture<Void> start() throws Exception;
CompletableFuture<Void> start(String url);
CompletableFuture<Void> send(String message);
void setOnReceive(OnReceiveCallBack callback);
void onReceive(String message) throws Exception;

View File

@ -19,8 +19,7 @@ class WebSocketTransport implements Transport {
private static final String WS = "ws";
private static final String WSS = "wss";
public WebSocketTransport(String url, Map<String, String> headers, HttpClient client, Logger logger) {
this.url = formatUrl(url);
public WebSocketTransport(Map<String, String> headers, HttpClient client, Logger logger) {
this.logger = logger;
this.client = client;
this.headers = headers;
@ -41,7 +40,8 @@ class WebSocketTransport implements Transport {
}
@Override
public CompletableFuture<Void> start() {
public CompletableFuture<Void> start(String url) {
this.url = formatUrl(url);
logger.log(LogLevel.Debug, "Starting Websocket connection.");
this.webSocketClient = client.createWebSocket(this.url, this.headers);
this.webSocketClient.setOnReceive((message) -> onReceive(message));

View File

@ -1019,6 +1019,7 @@ class HubConnectionTest {
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
assertEquals("http://testexample.com/?id=bVOiRPG8-6YiJ6d7ZcTOVQ", transport.getUrl());
hubConnection.stop();
assertEquals("Bearer newToken", token.get());
}

View File

@ -246,6 +246,16 @@ class JsonHubProtocolTest {
assertEquals("Invocation provides 1 argument(s) but target expects 2.", exception.getMessage());
}
@Test
public void errorWhileParsingIncompleteMessage() throws Exception {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":";
TestBinder binder = new TestBinder(new InvocationMessage(null, "test", new Object[] { 42, 24 }));
RuntimeException exception = assertThrows(RuntimeException.class,
() -> jsonHubProtocol.parseMessages(stringifiedMessage, binder));
assertEquals("Message is incomplete.", exception.getMessage());
}
private class TestBinder implements InvocationBinder {
private Class<?>[] paramTypes = null;
private Class<?> returnType = null;

View File

@ -9,9 +9,11 @@ import java.util.concurrent.CompletableFuture;
class MockTransport implements Transport {
private OnReceiveCallBack onReceiveCallBack;
private ArrayList<String> sentMessages = new ArrayList<>();
private String url;
@Override
public CompletableFuture start() {
public CompletableFuture start(String url) {
this.url = url;
return CompletableFuture.completedFuture(null);
}
@ -43,4 +45,8 @@ class MockTransport implements Transport {
public String[] getSentMessages() {
return sentMessages.toArray(new String[sentMessages.size()]);
}
public String getUrl() {
return this.url;
}
}

View File

@ -14,8 +14,8 @@ import org.junit.jupiter.api.Test;
class WebSocketTransportTest {
@Test
public void WebsocketThrowsIfItCantConnect() throws Exception {
Transport transport = new WebSocketTransport("http://www.notarealurl12345.fake", new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start().get(1, TimeUnit.SECONDS));
Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start("http://www.example.com").get(1, TimeUnit.SECONDS));
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());
}
}

View File

@ -24,8 +24,11 @@ class WebSocketTransportUrlFormatTest {
@ParameterizedTest
@MethodSource("protocols")
public void checkWebsocketUrlProtocol(String url, String expectedUrl) throws URISyntaxException {
WebSocketTransport webSocketTransport = new WebSocketTransport(url, new HashMap<>(), new TestHttpClient(), new NullLogger());
public void checkWebsocketUrlProtocol(String url, String expectedUrl) {
WebSocketTransport webSocketTransport = new WebSocketTransport(new HashMap<>(), new TestHttpClient(), new NullLogger());
try {
webSocketTransport.start(url);
} catch (Exception e) {}
assertEquals(expectedUrl, webSocketTransport.getUrl());
}
}