From 8e50db65f32dd71a24a1771cc0f295897ce2fdce Mon Sep 17 00:00:00 2001 From: Brennan Date: Wed, 8 Apr 2020 12:48:50 -0700 Subject: [PATCH] Cleanup resources in Java client (#20473) --- .../microsoft/signalr/DefaultHttpClient.java | 8 +++- .../com/microsoft/signalr/HttpClient.java | 4 +- .../com/microsoft/signalr/HubConnection.java | 32 ++++++++++++-- .../signalr/LongPollingTransport.java | 2 + .../microsoft/signalr/HubConnectionTest.java | 28 ++++++++++++- .../com/microsoft/signalr/TestHttpClient.java | 10 +++++ .../signalr/WebSocketTransportTest.java | 4 ++ .../com/microsoft/signalr/sample/Chat.java | 42 +++++++++---------- 8 files changed, 102 insertions(+), 28 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index a264f5d09f..58a95445c8 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -29,11 +29,17 @@ final class DefaultHttpClient extends HttpClient { return new DefaultHttpClient(timeoutInMilliseconds, newClient); } + @Override + public void close() { + if (this.client != null) { + this.client.dispatcher().executorService().shutdown(); + } + } + public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) { if (client != null) { this.client = client; } else { - OkHttpClient.Builder builder = new OkHttpClient.Builder().cookieJar(new CookieJar() { private List cookieList = new ArrayList<>(); private Lock cookieLock = new ReentrantLock(); diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java index 534457367f..97efaa9804 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java @@ -74,7 +74,7 @@ class HttpResponse { } } -abstract class HttpClient { +abstract class HttpClient implements AutoCloseable { public Single get(String url) { HttpRequest request = new HttpRequest(); request.setUrl(url); @@ -127,4 +127,6 @@ abstract class HttpClient { public abstract WebSocketWrapper createWebSocket(String url, Map headers); public abstract HttpClient cloneWithTimeOut(int timeoutInMilliseconds); + + public abstract void close(); } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 5b04191f4c..5c884cfff1 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -24,13 +24,14 @@ import io.reactivex.subjects.*; /** * A connection used to invoke hub methods on a SignalR Server. */ -public class HubConnection { +public class HubConnection implements AutoCloseable { private static final String RECORD_SEPARATOR = "\u001e"; private static final List> emptyArray = new ArrayList<>(); private static final int MAX_NEGOTIATE_ATTEMPTS = 100; private String baseUrl; private Transport transport; + private boolean customTransport = false; private OnReceiveCallBack callback; private final CallbackMap handlers = new CallbackMap(); private HubProtocol protocol; @@ -59,6 +60,7 @@ public class HubConnection { private String connectionId; private final int negotiateVersion = 1; private final Logger logger = LoggerFactory.getLogger(HubConnection.class); + private ScheduledExecutorService handshakeTimeout = null; /** * Sets the server timeout interval for the connection. @@ -111,7 +113,7 @@ public class HubConnection { } // For testing purposes - Map getStreamMap() { + Map getStreamMap() { return this.streamMap; } @@ -146,6 +148,7 @@ public class HubConnection { if (transport != null) { this.transport = transport; + this.customTransport = true; } else if (transportEnum != null) { this.transportEnum = transportEnum; } @@ -246,8 +249,8 @@ public class HubConnection { } private void timeoutHandshakeResponse(long timeout, TimeUnit unit) { - ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); - scheduledThreadPool.schedule(() -> { + handshakeTimeout = Executors.newSingleThreadScheduledExecutor(); + handshakeTimeout.schedule(() -> { // If onError is called on a completed subject the global error handler is called if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) { @@ -531,6 +534,15 @@ public class HubConnection { transportEnum = TransportEnum.ALL; this.localHeaders.clear(); this.streamMap.clear(); + + if (this.handshakeTimeout != null) { + this.handshakeTimeout.shutdownNow(); + this.handshakeTimeout = null; + } + + if (this.customTransport == false) { + this.transport = null; + } } finally { hubConnectionStateLock.unlock(); } @@ -1097,4 +1109,16 @@ public class HubConnection { return handlers.get(0).getClasses(); } } + + @Override + public void close() { + try { + stop().blockingAwait(); + } finally { + // Don't close HttpClient if it's passed in by the user + if (this.httpClient != null && this.httpClient instanceof DefaultHttpClient) { + this.httpClient.close(); + } + } + } } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 89fbdbcd86..32eaac2244 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -160,6 +160,8 @@ class LongPollingTransport implements Transport { CompletableSubject stopCompletableSubject = CompletableSubject.create(); return this.receiveLoop.andThen(Completable.defer(() -> { logger.info("LongPolling transport stopped."); + this.onReceiveThread.shutdown(); + this.threadPool.shutdown(); this.onClose.invoke(this.closeError); return Completable.complete(); })).subscribeWith(stopCompletableSubject); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index 52a6bf3487..fc7f86304c 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -2479,7 +2479,7 @@ class HubConnectionTest { } @Test - public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() { + public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() { MockTransport mockTransport = new MockTransport(); TestHttpClient client = new TestHttpClient() .on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) @@ -2519,4 +2519,30 @@ class HubConnectionTest { () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getMessage()); } + + @Test + public void hubConnectionCloseCallsStop() throws Exception { + MockTransport mockTransport = new MockTransport(); + TestHttpClient client = new TestHttpClient() + .on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) + .on("POST", "http://testexample.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); + + CompletableSubject close = CompletableSubject.create(); + + try (HubConnection hubConnection = HubConnectionBuilder + .create("http://example.com") + .withTransportImplementation(mockTransport) + .withHttpClient(client) + .build()) { + + hubConnection.onClosed(e -> { + close.onComplete(); + }); + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); + } + + close.timeout(1, TimeUnit.SECONDS).blockingGet(); + } } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java index eea0453504..ef3c27989f 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java @@ -12,6 +12,7 @@ import io.reactivex.Single; class TestHttpClient extends HttpClient { private TestHttpRequestHandler handler; private List sentRequests; + private boolean closeCalled; public TestHttpClient() { this.sentRequests = new ArrayList<>(); @@ -76,6 +77,15 @@ class TestHttpClient extends HttpClient { return this; } + @Override + public void close() { + this.closeCalled = true; + } + + public boolean getCloseCalled() { + return this.closeCalled; + } + interface TestHttpRequestHandler { Single invoke(HttpRequest request); } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java index a3a3595550..5edda35267 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java @@ -55,6 +55,10 @@ class WebSocketTransportTest { public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) { return null; } + + @Override + public void close() { + } } class TestWrapper extends WebSocketWrapper { diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java index 5201a09158..d48c898185 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java @@ -10,33 +10,33 @@ import com.microsoft.signalr.HubConnectionBuilder; public class Chat { - public static void main(String[] args) { + public static void main(final String[] args) throws Exception { System.out.println("Enter the URL of the SignalR Chat you want to join"); - Scanner reader = new Scanner(System.in); // Reading from System.in - String input = reader.nextLine(); + final Scanner reader = new Scanner(System.in); // Reading from System.in + final String input = reader.nextLine(); - HubConnection hubConnection = HubConnectionBuilder.create(input).build(); + try (HubConnection hubConnection = HubConnectionBuilder.create(input).build()) { + hubConnection.on("Send", (message) -> { + System.out.println(message); + }, String.class); - hubConnection.on("Send", (message) -> { - System.out.println(message); - }, String.class); + hubConnection.onClosed((ex) -> { + if (ex != null) { + System.out.printf("There was an error: %s", ex.getMessage()); + } + }); - hubConnection.onClosed((ex) -> { - if (ex != null) { - System.out.printf("There was an error: %s", ex.getMessage()); + //This is a blocking call + hubConnection.start().blockingAwait(); + + String message = ""; + while (!message.equals("leave")) { + // Scans the next token of the input as an int. + message = reader.nextLine(); + hubConnection.send("Send", message); } - }); - //This is a blocking call - hubConnection.start().blockingAwait(); - - String message = ""; - while (!message.equals("leave")) { - // Scans the next token of the input as an int. - message = reader.nextLine(); - hubConnection.send("Send", message); + hubConnection.stop().blockingAwait(); } - - hubConnection.stop().blockingAwait(); } }