Java client connection state part 1 (#24166)

This commit is contained in:
Brennan 2020-08-24 16:45:33 -07:00 committed by GitHub
parent f2c3c11a2d
commit db77380c84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 75 deletions

View File

@ -66,6 +66,7 @@ public class HubConnection implements AutoCloseable {
private final int negotiateVersion = 1;
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
private ScheduledExecutorService handshakeTimeout = null;
private Completable start;
/**
* Sets the server timeout interval for the connection.
@ -341,83 +342,99 @@ public class HubConnection implements AutoCloseable {
* @return A Completable that completes when the connection has been established.
*/
public Completable start() {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
return Completable.complete();
}
CompletableSubject localStart = CompletableSubject.create();
handshakeResponseSubject = CompletableSubject.create();
handshakeReceived = false;
CompletableSubject tokenCompletable = CompletableSubject.create();
localHeaders.put(UserAgentHelper.getUserAgentName(), UserAgentHelper.createUserAgentString());
if (headers != null) {
this.localHeaders.putAll(headers);
}
accessTokenProvider.subscribe(token -> {
if (token != null && !token.isEmpty()) {
this.localHeaders.put("Authorization", "Bearer " + token);
hubConnectionStateLock.lock();
try {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
logger.debug("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately.", hubConnectionState);
return start;
}
tokenCompletable.onComplete();
}, error -> {
tokenCompletable.onError(error);
});
stopError = null;
Single<NegotiateResponse> negotiate = null;
if (!skipNegotiate) {
negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0)));
} else {
negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(new NegotiateResponse(baseUrl))));
}
hubConnectionState = HubConnectionState.CONNECTING;
start = localStart;
CompletableSubject start = CompletableSubject.create();
handshakeResponseSubject = CompletableSubject.create();
handshakeReceived = false;
CompletableSubject tokenCompletable = CompletableSubject.create();
localHeaders.put(UserAgentHelper.getUserAgentName(), UserAgentHelper.createUserAgentString());
if (headers != null) {
this.localHeaders.putAll(headers);
}
negotiate.flatMapCompletable(negotiateResponse -> {
logger.debug("Starting HubConnection.");
if (transport == null) {
Single<String> tokenProvider = negotiateResponse.getAccessToken() != null ? Single.just(negotiateResponse.getAccessToken()) : accessTokenProvider;
switch (transportEnum) {
case LONG_POLLING:
transport = new LongPollingTransport(localHeaders, httpClient, tokenProvider);
break;
default:
transport = new WebSocketTransport(localHeaders, httpClient);
accessTokenProvider.subscribe(token -> {
if (token != null && !token.isEmpty()) {
this.localHeaders.put("Authorization", "Bearer " + token);
}
tokenCompletable.onComplete();
}, error -> {
tokenCompletable.onError(error);
});
stopError = null;
Single<NegotiateResponse> negotiate = null;
if (!skipNegotiate) {
negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0)));
} else {
negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(new NegotiateResponse(baseUrl))));
}
transport.setOnReceive(this.callback);
transport.setOnClose((message) -> stopConnection(message));
negotiate.flatMapCompletable(negotiateResponse -> {
logger.debug("Starting HubConnection.");
if (transport == null) {
Single<String> tokenProvider = negotiateResponse.getAccessToken() != null ? Single.just(negotiateResponse.getAccessToken()) : accessTokenProvider;
switch (transportEnum) {
case LONG_POLLING:
transport = new LongPollingTransport(localHeaders, httpClient, tokenProvider);
break;
default:
transport = new WebSocketTransport(localHeaders, httpClient);
}
}
return transport.start(negotiateResponse.getFinalUrl()).andThen(Completable.defer(() -> {
ByteBuffer handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
transport.setOnReceive(this.callback);
transport.setOnClose((message) -> stopConnection(message));
connectionState = new ConnectionState(this);
return transport.start(negotiateResponse.getFinalUrl()).andThen(Completable.defer(() -> {
ByteBuffer handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
return transport.send(handshake).andThen(Completable.defer(() -> {
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
return handshakeResponseSubject.andThen(Completable.defer(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
logger.info("HubConnection started.");
resetServerTimeout();
//Don't send pings if we're using long polling.
if (transportEnum != TransportEnum.LONG_POLLING) {
activatePingTimer();
connectionState = new ConnectionState(this);
return transport.send(handshake).andThen(Completable.defer(() -> {
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
return handshakeResponseSubject.andThen(Completable.defer(() -> {
hubConnectionStateLock.lock();
try {
hubConnectionState = HubConnectionState.CONNECTED;
logger.info("HubConnection started.");
resetServerTimeout();
//Don't send pings if we're using long polling.
if (transportEnum != TransportEnum.LONG_POLLING) {
activatePingTimer();
}
} finally {
hubConnectionStateLock.unlock();
}
} finally {
hubConnectionStateLock.unlock();
}
return Completable.complete();
return Completable.complete();
}));
}));
}));
}));
// subscribe makes this a "hot" completable so this runs immediately
}).subscribeWith(start);
// subscribe makes this a "hot" completable so this runs immediately
}).subscribe(() -> {
localStart.onComplete();
}, error -> {
hubConnectionStateLock.lock();
hubConnectionState = HubConnectionState.DISCONNECTED;
hubConnectionStateLock.unlock();
localStart.onError(error);
});
} finally {
hubConnectionStateLock.unlock();
}
return start;
return localStart;
}
private void activatePingTimer() {
@ -445,8 +462,8 @@ public class HubConnection implements AutoCloseable {
}
private Single<NegotiateResponse> startNegotiate(String url, int negotiateAttempts) {
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
return Single.just(null);
if (hubConnectionState != HubConnectionState.CONNECTING) {
throw new RuntimeException("HubConnection trying to negotiate when not in the CONNECTING state.");
}
return handleNegotiate(url).flatMap(response -> {

View File

@ -9,4 +9,5 @@ package com.microsoft.signalr;
public enum HubConnectionState {
CONNECTED,
DISCONNECTED,
CONNECTING,
}

View File

@ -24,6 +24,7 @@ import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.CompletableSubject;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.ReplaySubject;
@ -2577,9 +2578,10 @@ class HubConnectionTest {
value.getAndUpdate((val) -> val + 1);
});
SingleSubject<ByteBuffer> handshakeMessageTask = mockTransport.getNextSentMessage();
// On start we're going to receive the handshake response and also an invocation in the same payload.
hubConnection.start();
mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait();
ByteBuffer sentMessage = handshakeMessageTask.timeout(1, TimeUnit.SECONDS).blockingGet();
String expectedSentMessage = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
assertEquals(expectedSentMessage, TestUtils.byteBufferToString(mockTransport.getSentMessages()[0]));
@ -2647,7 +2649,7 @@ class HubConnectionTest {
}
@Test
public void callingStartOnStartedHubConnectionNoOps() {
public void callingStartOnStartedHubConnectionNoops() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -2655,7 +2657,35 @@ class HubConnectionTest {
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop();
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void callingStartOnStartingHubConnectionWaitsForOriginalStart() {
CompletableSubject startedAccessToken = CompletableSubject.create();
CompletableSubject continueAccessToken = CompletableSubject.create();
HubConnection hubConnection = HubConnectionBuilder.create("http://example.com")
.withTransportImplementation(new MockTransport(true))
.withHttpClient(new TestHttpClient())
.withAccessTokenProvider(Single.defer(() -> {
startedAccessToken.onComplete();
continueAccessToken.timeout(1, TimeUnit.SECONDS).blockingAwait();
return Single.just("test");
}).subscribeOn(Schedulers.newThread()))
.shouldSkipNegotiate(true)
.build();
Completable start = hubConnection.start();
startedAccessToken.timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTING, hubConnection.getConnectionState());
Completable start2 = hubConnection.start();
continueAccessToken.onComplete();
start.timeout(1, TimeUnit.SECONDS).blockingAwait();
start2.timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -3595,9 +3625,6 @@ class HubConnectionTest {
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().blockingAwait();
assertEquals("ExampleValue", beforeRedirectHeader.get());
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
assertEquals("Bearer redirectToken", afterRedirectHeader.get());
// Making sure you can do this after restarting the HubConnection.
@ -3605,9 +3632,6 @@ class HubConnectionTest {
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
hubConnection.stop().blockingAwait();
assertEquals("ExampleValue", beforeRedirectHeader.get());
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
assertEquals("Bearer redirectToken", afterRedirectHeader.get());
}
@ -3699,7 +3723,7 @@ class HubConnectionTest {
}
@Test
public void hubConnectionCloseCallsStop() throws Exception {
public void hubConnectionCloseCallsStop() {
MockTransport mockTransport = new MockTransport();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"url\":\"http://testexample.com/\"}"))))

View File

@ -8,7 +8,6 @@ import java.util.Scanner;
import com.microsoft.signalr.HubConnection;
import com.microsoft.signalr.HubConnectionBuilder;
public class Chat {
public static void main(final String[] args) throws Exception {
System.out.println("Enter the URL of the SignalR Chat you want to join");
@ -33,7 +32,7 @@ public class Chat {
while (!message.equals("leave")) {
// Scans the next token of the input as an int.
message = reader.nextLine();
hubConnection.send("Send", message);
hubConnection.send("Send", "Java", message);
}
hubConnection.stop().blockingAwait();