diff --git a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java index 0aa4e4c336..e67bdacc88 100644 --- a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java @@ -527,7 +527,9 @@ public class HubConnection implements AutoCloseable { hubConnectionStateLock.unlock(); } - return transport.stop(); + Completable stop = transport.stop(); + stop.onErrorComplete().subscribe(); + return stop; } /** diff --git a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 00dc46f97b..d52d5edb38 100644 --- a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -19,7 +19,7 @@ import io.reactivex.subjects.CompletableSubject; class LongPollingTransport implements Transport { private OnReceiveCallBack onReceiveCallBack; - private TransportOnClosedCallback onClose; + private TransportOnClosedCallback onClose = (reason) -> {}; private String url; private final HttpClient client; private final HttpClient pollingClient; @@ -30,6 +30,7 @@ class LongPollingTransport implements Transport { private String pollUrl; private String closeError; private CompletableSubject receiveLoop = CompletableSubject.create(); + private CompletableSubject closeSubject = CompletableSubject.create(); private ExecutorService threadPool; private ExecutorService onReceiveThread; private AtomicBoolean stopCalled = new AtomicBoolean(false); @@ -157,7 +158,7 @@ class LongPollingTransport implements Transport { public Completable stop() { if (stopCalled.compareAndSet(false, true)) { this.active = false; - return this.updateHeaderToken().andThen(Completable.defer(() -> { + Completable stopCompletable = this.updateHeaderToken().andThen(Completable.defer(() -> { HttpRequest request = new HttpRequest(); request.addHeaders(headers); return this.pollingClient.delete(this.url, request).ignoreElement() @@ -168,8 +169,10 @@ class LongPollingTransport implements Transport { })).doOnError(e -> { cleanup(e.getMessage()); }); + + stopCompletable.subscribe(closeSubject); } - return Completable.complete(); + return closeSubject; } private void cleanup(String error) { diff --git a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java index 402a21c74b..57c0c8c9b3 100644 --- a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java @@ -3127,6 +3127,84 @@ class HubConnectionTest { assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } + @Test + public void stopWithoutObservingWithLongPollingTransportStops() { + AtomicInteger requestCount = new AtomicInteger(0); + CompletableSubject blockGet = CompletableSubject.create(); + TestHttpClient client = new TestHttpClient() + .on("POST", "http://example.com/negotiate?negotiateVersion=1", + (req) -> Single.just(new HttpResponse(200, "", + TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")))) + .on("GET", (req) -> { + if (requestCount.getAndIncrement() > 1) { + blockGet.blockingAwait(); + } + return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR))); + }) + .on("POST", "http://example.com?id=bVOiRPG8-6YiJ6d7ZcTOVQ", (req) -> { + return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer(""))); + }); + + HubConnection hubConnection = HubConnectionBuilder + .create("http://example.com") + .withTransport(TransportEnum.LONG_POLLING) + .withHttpClient(client) + .build(); + + CompletableSubject closed = CompletableSubject.create(); + hubConnection.onClosed((e) -> { + closed.onComplete(); + }); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + hubConnection.stop(); + closed.timeout(1, TimeUnit.SECONDS).blockingAwait(); + blockGet.onComplete(); + assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); + } + + @Test + public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithLongPolling() { + AtomicInteger requestCount = new AtomicInteger(0); + CompletableSubject blockGet = CompletableSubject.create(); + TestHttpClient client = new TestHttpClient() + .on("POST", "http://example.com/negotiate?negotiateVersion=1", + (req) -> Single.just(new HttpResponse(200, "", + TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")))) + .on("GET", (req) -> { + if (requestCount.getAndIncrement() > 1) { + blockGet.blockingAwait(); + return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"type\":7}" + RECORD_SEPARATOR))); + } + return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR))); + }) + .on("POST", "http://example.com?id=bVOiRPG8-6YiJ6d7ZcTOVQ", (req) -> { + return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer(""))); + }); + + HubConnection hubConnection = HubConnectionBuilder + .create("http://example.com") + .withTransport(TransportEnum.LONG_POLLING) + .withHttpClient(client) + .build(); + + CompletableSubject closed = CompletableSubject.create(); + hubConnection.onClosed((ex) -> { + closed.onComplete(); + }); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); + blockGet.onComplete(); + + closed.timeout(1, TimeUnit.SECONDS).blockingAwait(); + + assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); + } + @Test public void receivingServerSentEventsTransportFromNegotiateFails() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate?negotiateVersion=1", diff --git a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/LongPollingTransportTest.java index 0e65390d39..6ee518d415 100644 --- a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -307,7 +307,7 @@ public class LongPollingTransportTest { } @Test - public void After204StopDoesNotTriggerOnClose() { + public void After204StopDoesNotTriggerOnCloseAgain() { AtomicBoolean firstPoll = new AtomicBoolean(true); CompletableSubject block = CompletableSubject.create(); TestHttpClient client = new TestHttpClient() @@ -317,6 +317,9 @@ public class LongPollingTransportTest { return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer)); } return Single.just(new HttpResponse(204, "", TestUtils.emptyByteBuffer)); + }) + .on("DELETE", (req) -> { + return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer)); }); Map headers = new HashMap<>(); @@ -356,7 +359,7 @@ public class LongPollingTransportTest { }) .on("DELETE", (req) ->{ //Unblock the last poll when we sent the DELETE request. - block.onComplete(); + block.onComplete(); return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer)); }); @@ -373,4 +376,25 @@ public class LongPollingTransportTest { assertEquals(1, onCloseCount.get()); assertFalse(transport.isActive()); } + + @Test + public void ErrorFromClosePropagatesOnSecondStopCall() { + AtomicBoolean firstPoll = new AtomicBoolean(true); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (firstPoll.get()) { + firstPoll.set(false); + return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer)); + } + return Single.just(new HttpResponse(204, "", TestUtils.emptyByteBuffer)); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + + transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + + RuntimeException exception = assertThrows(RuntimeException.class, () -> transport.stop().blockingAwait(100, TimeUnit.SECONDS)); + assertEquals("Request has no handler: DELETE http://example.com", exception.getMessage()); + } }