[Java] Fix close with LongPolling (#25582)

This commit is contained in:
Brennan 2020-09-08 10:54:09 -07:00 committed by GitHub
parent 475fc569d9
commit 9cbf55b094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 6 deletions

View File

@ -527,7 +527,9 @@ public class HubConnection implements AutoCloseable {
hubConnectionStateLock.unlock();
}
return transport.stop();
Completable stop = transport.stop();
stop.onErrorComplete().subscribe();
return stop;
}
/**

View File

@ -19,7 +19,7 @@ import io.reactivex.subjects.CompletableSubject;
class LongPollingTransport implements Transport {
private OnReceiveCallBack onReceiveCallBack;
private TransportOnClosedCallback onClose;
private TransportOnClosedCallback onClose = (reason) -> {};
private String url;
private final HttpClient client;
private final HttpClient pollingClient;
@ -30,6 +30,7 @@ class LongPollingTransport implements Transport {
private String pollUrl;
private String closeError;
private CompletableSubject receiveLoop = CompletableSubject.create();
private CompletableSubject closeSubject = CompletableSubject.create();
private ExecutorService threadPool;
private ExecutorService onReceiveThread;
private AtomicBoolean stopCalled = new AtomicBoolean(false);
@ -157,7 +158,7 @@ class LongPollingTransport implements Transport {
public Completable stop() {
if (stopCalled.compareAndSet(false, true)) {
this.active = false;
return this.updateHeaderToken().andThen(Completable.defer(() -> {
Completable stopCompletable = this.updateHeaderToken().andThen(Completable.defer(() -> {
HttpRequest request = new HttpRequest();
request.addHeaders(headers);
return this.pollingClient.delete(this.url, request).ignoreElement()
@ -168,8 +169,10 @@ class LongPollingTransport implements Transport {
})).doOnError(e -> {
cleanup(e.getMessage());
});
stopCompletable.subscribe(closeSubject);
}
return Completable.complete();
return closeSubject;
}
private void cleanup(String error) {

View File

@ -3127,6 +3127,84 @@ class HubConnectionTest {
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void stopWithoutObservingWithLongPollingTransportStops() {
AtomicInteger requestCount = new AtomicInteger(0);
CompletableSubject blockGet = CompletableSubject.create();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate?negotiateVersion=1",
(req) -> Single.just(new HttpResponse(200, "",
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))))
.on("GET", (req) -> {
if (requestCount.getAndIncrement() > 1) {
blockGet.blockingAwait();
}
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR)));
})
.on("POST", "http://example.com?id=bVOiRPG8-6YiJ6d7ZcTOVQ", (req) -> {
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("")));
});
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(TransportEnum.LONG_POLLING)
.withHttpClient(client)
.build();
CompletableSubject closed = CompletableSubject.create();
hubConnection.onClosed((e) -> {
closed.onComplete();
});
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
hubConnection.stop();
closed.timeout(1, TimeUnit.SECONDS).blockingAwait();
blockGet.onComplete();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithLongPolling() {
AtomicInteger requestCount = new AtomicInteger(0);
CompletableSubject blockGet = CompletableSubject.create();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate?negotiateVersion=1",
(req) -> Single.just(new HttpResponse(200, "",
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))))
.on("GET", (req) -> {
if (requestCount.getAndIncrement() > 1) {
blockGet.blockingAwait();
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"type\":7}" + RECORD_SEPARATOR)));
}
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR)));
})
.on("POST", "http://example.com?id=bVOiRPG8-6YiJ6d7ZcTOVQ", (req) -> {
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("")));
});
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(TransportEnum.LONG_POLLING)
.withHttpClient(client)
.build();
CompletableSubject closed = CompletableSubject.create();
hubConnection.onClosed((ex) -> {
closed.onComplete();
});
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
blockGet.onComplete();
closed.timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}
@Test
public void receivingServerSentEventsTransportFromNegotiateFails() {
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate?negotiateVersion=1",

View File

@ -307,7 +307,7 @@ public class LongPollingTransportTest {
}
@Test
public void After204StopDoesNotTriggerOnClose() {
public void After204StopDoesNotTriggerOnCloseAgain() {
AtomicBoolean firstPoll = new AtomicBoolean(true);
CompletableSubject block = CompletableSubject.create();
TestHttpClient client = new TestHttpClient()
@ -317,6 +317,9 @@ public class LongPollingTransportTest {
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
}
return Single.just(new HttpResponse(204, "", TestUtils.emptyByteBuffer));
})
.on("DELETE", (req) -> {
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
});
Map<String, String> headers = new HashMap<>();
@ -356,7 +359,7 @@ public class LongPollingTransportTest {
})
.on("DELETE", (req) ->{
//Unblock the last poll when we sent the DELETE request.
block.onComplete();
block.onComplete();
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
});
@ -373,4 +376,25 @@ public class LongPollingTransportTest {
assertEquals(1, onCloseCount.get());
assertFalse(transport.isActive());
}
@Test
public void ErrorFromClosePropagatesOnSecondStopCall() {
AtomicBoolean firstPoll = new AtomicBoolean(true);
TestHttpClient client = new TestHttpClient()
.on("GET", (req) -> {
if (firstPoll.get()) {
firstPoll.set(false);
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
}
return Single.just(new HttpResponse(204, "", TestUtils.emptyByteBuffer));
});
Map<String, String> headers = new HashMap<>();
LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just(""));
transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait();
RuntimeException exception = assertThrows(RuntimeException.class, () -> transport.stop().blockingAwait(100, TimeUnit.SECONDS));
assertEquals("Request has no handler: DELETE http://example.com", exception.getMessage());
}
}