Cleanup resources in Java client (#20473)

This commit is contained in:
Brennan 2020-04-08 12:48:50 -07:00 committed by GitHub
parent 1d56c516f8
commit 8e50db65f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 102 additions and 28 deletions

View File

@ -29,11 +29,17 @@ final class DefaultHttpClient extends HttpClient {
return new DefaultHttpClient(timeoutInMilliseconds, newClient); return new DefaultHttpClient(timeoutInMilliseconds, newClient);
} }
@Override
public void close() {
if (this.client != null) {
this.client.dispatcher().executorService().shutdown();
}
}
public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) { public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) {
if (client != null) { if (client != null) {
this.client = client; this.client = client;
} else { } else {
OkHttpClient.Builder builder = new OkHttpClient.Builder().cookieJar(new CookieJar() { OkHttpClient.Builder builder = new OkHttpClient.Builder().cookieJar(new CookieJar() {
private List<Cookie> cookieList = new ArrayList<>(); private List<Cookie> cookieList = new ArrayList<>();
private Lock cookieLock = new ReentrantLock(); private Lock cookieLock = new ReentrantLock();

View File

@ -74,7 +74,7 @@ class HttpResponse {
} }
} }
abstract class HttpClient { abstract class HttpClient implements AutoCloseable {
public Single<HttpResponse> get(String url) { public Single<HttpResponse> get(String url) {
HttpRequest request = new HttpRequest(); HttpRequest request = new HttpRequest();
request.setUrl(url); request.setUrl(url);
@ -127,4 +127,6 @@ abstract class HttpClient {
public abstract WebSocketWrapper createWebSocket(String url, Map<String, String> headers); public abstract WebSocketWrapper createWebSocket(String url, Map<String, String> headers);
public abstract HttpClient cloneWithTimeOut(int timeoutInMilliseconds); public abstract HttpClient cloneWithTimeOut(int timeoutInMilliseconds);
public abstract void close();
} }

View File

@ -24,13 +24,14 @@ import io.reactivex.subjects.*;
/** /**
* A connection used to invoke hub methods on a SignalR Server. * A connection used to invoke hub methods on a SignalR Server.
*/ */
public class HubConnection { public class HubConnection implements AutoCloseable {
private static final String RECORD_SEPARATOR = "\u001e"; private static final String RECORD_SEPARATOR = "\u001e";
private static final List<Class<?>> emptyArray = new ArrayList<>(); private static final List<Class<?>> emptyArray = new ArrayList<>();
private static final int MAX_NEGOTIATE_ATTEMPTS = 100; private static final int MAX_NEGOTIATE_ATTEMPTS = 100;
private String baseUrl; private String baseUrl;
private Transport transport; private Transport transport;
private boolean customTransport = false;
private OnReceiveCallBack callback; private OnReceiveCallBack callback;
private final CallbackMap handlers = new CallbackMap(); private final CallbackMap handlers = new CallbackMap();
private HubProtocol protocol; private HubProtocol protocol;
@ -59,6 +60,7 @@ public class HubConnection {
private String connectionId; private String connectionId;
private final int negotiateVersion = 1; private final int negotiateVersion = 1;
private final Logger logger = LoggerFactory.getLogger(HubConnection.class); private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
private ScheduledExecutorService handshakeTimeout = null;
/** /**
* Sets the server timeout interval for the connection. * Sets the server timeout interval for the connection.
@ -111,7 +113,7 @@ public class HubConnection {
} }
// For testing purposes // For testing purposes
Map<String,Observable> getStreamMap() { Map<String, Observable> getStreamMap() {
return this.streamMap; return this.streamMap;
} }
@ -146,6 +148,7 @@ public class HubConnection {
if (transport != null) { if (transport != null) {
this.transport = transport; this.transport = transport;
this.customTransport = true;
} else if (transportEnum != null) { } else if (transportEnum != null) {
this.transportEnum = transportEnum; this.transportEnum = transportEnum;
} }
@ -246,8 +249,8 @@ public class HubConnection {
} }
private void timeoutHandshakeResponse(long timeout, TimeUnit unit) { private void timeoutHandshakeResponse(long timeout, TimeUnit unit) {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); handshakeTimeout = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.schedule(() -> { handshakeTimeout.schedule(() -> {
// If onError is called on a completed subject the global error handler is called // If onError is called on a completed subject the global error handler is called
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable()))
{ {
@ -531,6 +534,15 @@ public class HubConnection {
transportEnum = TransportEnum.ALL; transportEnum = TransportEnum.ALL;
this.localHeaders.clear(); this.localHeaders.clear();
this.streamMap.clear(); this.streamMap.clear();
if (this.handshakeTimeout != null) {
this.handshakeTimeout.shutdownNow();
this.handshakeTimeout = null;
}
if (this.customTransport == false) {
this.transport = null;
}
} finally { } finally {
hubConnectionStateLock.unlock(); hubConnectionStateLock.unlock();
} }
@ -1097,4 +1109,16 @@ public class HubConnection {
return handlers.get(0).getClasses(); return handlers.get(0).getClasses();
} }
} }
@Override
public void close() {
try {
stop().blockingAwait();
} finally {
// Don't close HttpClient if it's passed in by the user
if (this.httpClient != null && this.httpClient instanceof DefaultHttpClient) {
this.httpClient.close();
}
}
}
} }

View File

@ -160,6 +160,8 @@ class LongPollingTransport implements Transport {
CompletableSubject stopCompletableSubject = CompletableSubject.create(); CompletableSubject stopCompletableSubject = CompletableSubject.create();
return this.receiveLoop.andThen(Completable.defer(() -> { return this.receiveLoop.andThen(Completable.defer(() -> {
logger.info("LongPolling transport stopped."); logger.info("LongPolling transport stopped.");
this.onReceiveThread.shutdown();
this.threadPool.shutdown();
this.onClose.invoke(this.closeError); this.onClose.invoke(this.closeError);
return Completable.complete(); return Completable.complete();
})).subscribeWith(stopCompletableSubject); })).subscribeWith(stopCompletableSubject);

View File

@ -2479,7 +2479,7 @@ class HubConnectionTest {
} }
@Test @Test
public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() { public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() {
MockTransport mockTransport = new MockTransport(); MockTransport mockTransport = new MockTransport();
TestHttpClient client = new TestHttpClient() TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) .on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
@ -2519,4 +2519,30 @@ class HubConnectionTest {
() -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait());
assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getMessage()); assertEquals("Unexpected status code returned from negotiate: 500 Internal server error.", exception.getMessage());
} }
@Test
public void hubConnectionCloseCallsStop() throws Exception {
MockTransport mockTransport = new MockTransport();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))
.on("POST", "http://testexample.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
CompletableSubject close = CompletableSubject.create();
try (HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransportImplementation(mockTransport)
.withHttpClient(client)
.build()) {
hubConnection.onClosed(e -> {
close.onComplete();
});
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
}
close.timeout(1, TimeUnit.SECONDS).blockingGet();
}
} }

View File

@ -12,6 +12,7 @@ import io.reactivex.Single;
class TestHttpClient extends HttpClient { class TestHttpClient extends HttpClient {
private TestHttpRequestHandler handler; private TestHttpRequestHandler handler;
private List<HttpRequest> sentRequests; private List<HttpRequest> sentRequests;
private boolean closeCalled;
public TestHttpClient() { public TestHttpClient() {
this.sentRequests = new ArrayList<>(); this.sentRequests = new ArrayList<>();
@ -76,6 +77,15 @@ class TestHttpClient extends HttpClient {
return this; return this;
} }
@Override
public void close() {
this.closeCalled = true;
}
public boolean getCloseCalled() {
return this.closeCalled;
}
interface TestHttpRequestHandler { interface TestHttpRequestHandler {
Single<HttpResponse> invoke(HttpRequest request); Single<HttpResponse> invoke(HttpRequest request);
} }

View File

@ -55,6 +55,10 @@ class WebSocketTransportTest {
public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) { public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
return null; return null;
} }
@Override
public void close() {
}
} }
class TestWrapper extends WebSocketWrapper { class TestWrapper extends WebSocketWrapper {

View File

@ -10,33 +10,33 @@ import com.microsoft.signalr.HubConnectionBuilder;
public class Chat { public class Chat {
public static void main(String[] args) { public static void main(final String[] args) throws Exception {
System.out.println("Enter the URL of the SignalR Chat you want to join"); System.out.println("Enter the URL of the SignalR Chat you want to join");
Scanner reader = new Scanner(System.in); // Reading from System.in final Scanner reader = new Scanner(System.in); // Reading from System.in
String input = reader.nextLine(); final String input = reader.nextLine();
HubConnection hubConnection = HubConnectionBuilder.create(input).build(); try (HubConnection hubConnection = HubConnectionBuilder.create(input).build()) {
hubConnection.on("Send", (message) -> {
System.out.println(message);
}, String.class);
hubConnection.on("Send", (message) -> { hubConnection.onClosed((ex) -> {
System.out.println(message); if (ex != null) {
}, String.class); System.out.printf("There was an error: %s", ex.getMessage());
}
});
hubConnection.onClosed((ex) -> { //This is a blocking call
if (ex != null) { hubConnection.start().blockingAwait();
System.out.printf("There was an error: %s", ex.getMessage());
String message = "";
while (!message.equals("leave")) {
// Scans the next token of the input as an int.
message = reader.nextLine();
hubConnection.send("Send", message);
} }
});
//This is a blocking call hubConnection.stop().blockingAwait();
hubConnection.start().blockingAwait();
String message = "";
while (!message.equals("leave")) {
// Scans the next token of the input as an int.
message = reader.nextLine();
hubConnection.send("Send", message);
} }
hubConnection.stop().blockingAwait();
} }
} }