Refactor transient state in Java client (#24219)

* Refactor transient state in Java client

* some fb

* fix nullref

* fixup rebase

* fb

* apply some fb

* fix flaky

* lock

* check previous state
This commit is contained in:
Brennan 2020-09-17 17:59:27 -07:00 committed by GitHub
parent 0287b259d8
commit 3ee23fb771
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 489 additions and 384 deletions

View File

@ -18,6 +18,7 @@ class NegotiateResponse {
private String error;
private String finalUrl;
private int version;
private TransportEnum chosenTransport;
public NegotiateResponse(JsonReader reader) {
try {
@ -125,4 +126,12 @@ class NegotiateResponse {
public void setFinalUrl(String url) {
this.finalUrl = url;
}
public TransportEnum getChosenTransport() {
return chosenTransport;
}
public void setChosenTransport(TransportEnum chosenTransport) {
this.chosenTransport = chosenTransport;
}
}

View File

@ -524,74 +524,6 @@ class HubConnectionTest {
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", TestUtils.byteBufferToString(messages[3]));
}
@Test
public void streamMapIsClearedOnClose() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream, 12);
stream.onNext("FirstItem");
ByteBuffer[] messages = mockTransport.getSentMessages();
assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", TestUtils.byteBufferToString(messages[1]));
assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", TestUtils.byteBufferToString(messages[2]));
stream.onComplete();
messages = mockTransport.getSentMessages();
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", TestUtils.byteBufferToString(messages[3]));
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertTrue(hubConnection.getStreamMap().isEmpty());
}
@Test
public void streamMapEntriesRemovedOnStreamClose() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream, 12);
ReplaySubject<String> secondStream = ReplaySubject.create();
hubConnection.send("SecondUploadStream", secondStream, 13);
stream.onNext("FirstItem");
secondStream.onNext("SecondItem");
ByteBuffer[] messages = mockTransport.getSentMessages();
assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", TestUtils.byteBufferToString(messages[1]));
assertEquals("{\"type\":1,\"target\":\"SecondUploadStream\",\"arguments\":[13],\"streamIds\":[\"2\"]}\u001E", TestUtils.byteBufferToString(messages[2]));
assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", TestUtils.byteBufferToString(messages[3]));
assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"SecondItem\"}\u001E", TestUtils.byteBufferToString(messages[4]));
assertEquals(2, hubConnection.getStreamMap().size());
assertTrue(hubConnection.getStreamMap().keySet().contains("1"));
assertTrue(hubConnection.getStreamMap().keySet().contains("2"));
// Verify that we clear the entry from the stream map after we clear the first stream.
stream.onComplete();
assertEquals(1, hubConnection.getStreamMap().size());
assertTrue(hubConnection.getStreamMap().keySet().contains("2"));
secondStream.onError(new Exception("Exception"));
assertEquals(0, hubConnection.getStreamMap().size());
assertTrue(hubConnection.getStreamMap().isEmpty());
messages = mockTransport.getSentMessages();
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", TestUtils.byteBufferToString(messages[5]));
assertEquals("{\"type\":3,\"invocationId\":\"2\",\"error\":\"java.lang.Exception: Exception\"}\u001E", TestUtils.byteBufferToString(messages[6]));
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertTrue(hubConnection.getStreamMap().isEmpty());
}
@Test
public void useSameSubjectMultipleTimes() {
MockTransport mockTransport = new MockTransport();
@ -3006,7 +2938,6 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
assertEquals(TransportEnum.WEBSOCKETS, hubConnection.getTransportEnum());
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait());
@ -3025,13 +2956,52 @@ class HubConnectionTest {
.withHttpClient(client)
.build();
assertEquals(TransportEnum.LONG_POLLING, hubConnection.getTransportEnum());
RuntimeException exception = assertThrows(RuntimeException.class,
() -> hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait());
assertEquals(exception.getMessage(), "There were no compatible transports on the server.");
}
@Test
public void ConnectionRestartDoesNotResetUserTransportEnum() {
AtomicInteger requestCount = new AtomicInteger(0);
AtomicReference<CompletableSubject> blockGet = new AtomicReference<CompletableSubject>(CompletableSubject.create());
TestHttpClient client = new TestHttpClient()
.on("POST", (req) -> {
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("")));
})
.on("POST", "http://example.com/negotiate?negotiateVersion=1",
(req) -> Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]},"
+ "{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))))
.on("GET", (req) -> {
if (requestCount.incrementAndGet() >= 3) {
blockGet.get().timeout(30, TimeUnit.SECONDS).blockingAwait();
}
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR)));
})
.on("DELETE", (req) -> {
blockGet.get().onComplete();
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("")));
});
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(TransportEnum.LONG_POLLING)
.withHttpClient(client)
.build();
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertTrue(hubConnection.getTransport() instanceof LongPollingTransport);
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
requestCount.set(0);
blockGet.set(CompletableSubject.create());
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertTrue(hubConnection.getTransport() instanceof LongPollingTransport);
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
}
@Test
public void LongPollingTransportAccessTokenProviderThrowsOnInitialPoll() {
TestHttpClient client = new TestHttpClient()
@ -3191,10 +3161,10 @@ class HubConnectionTest {
closed.onComplete();
});
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
hubConnection.stop();
closed.timeout(1, TimeUnit.SECONDS).blockingAwait();
closed.timeout(30, TimeUnit.SECONDS).blockingAwait();
blockGet.onComplete();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@ -3229,12 +3199,12 @@ class HubConnectionTest {
hubConnection.onClosed((ex) -> {
closed.onComplete();
});
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
blockGet.onComplete();
closed.timeout(1, TimeUnit.SECONDS).blockingAwait();
closed.timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}