[Java] Add pings and server timeout (#3027)

This commit is contained in:
BrennanConroy 2018-10-10 13:49:22 -07:00 committed by GitHub
parent be4fe6c6f9
commit 2ee351786f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 40 deletions

View File

@ -6,11 +6,15 @@ package com.microsoft.signalr;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@ -39,9 +43,36 @@ public class HubConnection {
private ConnectionState connectionState = null;
private HttpClient httpClient;
private String stopError;
private Timer pingTimer = null;
private AtomicLong nextServerTimeout = new AtomicLong();
private AtomicLong nextPingActivation = new AtomicLong();
private Duration keepAliveInterval = Duration.ofSeconds(15);
private Duration serverTimeout = Duration.ofSeconds(30);
private Duration tickRate = Duration.ofSeconds(1);
private CompletableFuture<Void> handshakeResponseFuture;
private Duration handshakeResponseTimeout = Duration.ofSeconds(15);
public void setServerTimeout(Duration serverTimeout) {
this.serverTimeout = serverTimeout;
}
public Duration getServerTimeout() {
return this.serverTimeout;
}
public void setKeepAliveInterval(Duration keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}
public Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}
// For testing purposes
void setTickRate(Duration tickRate) {
this.tickRate = tickRate;
}
HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single<String> accessTokenProvider, Duration handshakeResponseTimeout) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
@ -79,6 +110,7 @@ public class HubConnection {
this.skipNegotiate = skipNegotiate;
this.callback = (payload) -> {
resetServerTimeout();
if (!handshakeReceived) {
int handshakeLength = payload.indexOf(RECORD_SEPARATOR) + 1;
String handshakeResponseString = payload.substring(0, handshakeLength - 1);
@ -245,6 +277,29 @@ public class HubConnection {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
resetServerTimeout();
this.pingTimer = new Timer();
this.pingTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (System.currentTimeMillis() > nextServerTimeout.get()) {
stop("Server timeout elapsed without receiving a message from the server.");
return;
}
if (System.currentTimeMillis() > nextPingActivation.get()) {
sendHubMessage(PingMessage.getInstance());
}
} catch (Exception e) {
logger.log(LogLevel.Warning, String.format("Error sending ping: %s", e.getMessage()));
// The connection is probably in a bad or closed state now, cleanup the timer so
// it stops triggering
pingTimer.cancel();
}
}
}, new Date(0), tickRate.toMillis());
} finally {
hubConnectionStateLock.unlock();
}
@ -399,11 +454,21 @@ public class HubConnection {
private void sendHubMessage(HubMessage message) throws Exception {
String serializedMessage = protocol.writeMessage(message);
if (message.getMessageType() == HubMessageType.INVOCATION) {
logger.log(LogLevel.Debug, "Sending %d message '%s'.", message.getMessageType().value, ((InvocationMessage)message).getInvocationId());
logger.log(LogLevel.Debug, "Sending %s message '%s'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId());
} else {
logger.log(LogLevel.Debug, "Sending %d message.", message.getMessageType().value);
logger.log(LogLevel.Debug, "Sending %s message.", message.getMessageType().name());
}
transport.send(serializedMessage);
resetKeepAlive();
}
private void resetServerTimeout() {
this.nextServerTimeout.set(System.currentTimeMillis() + serverTimeout.toMillis());
}
private void resetKeepAlive() {
this.nextPingActivation.set(System.currentTimeMillis() + keepAliveInterval.toMillis());
}
/**

View File

@ -5,6 +5,8 @@ package com.microsoft.signalr;
class PingMessage extends HubMessage
{
int type = HubMessageType.PING.value;
private static PingMessage instance = new PingMessage();
private PingMessage()

View File

@ -35,7 +35,7 @@ class HubConnectionTest {
@Test
public void transportCloseTriggersStopInHubConnection() throws Exception {
MockTransport mockTransport = new MockTransport(true);
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -46,7 +46,7 @@ class HubConnectionTest {
@Test
public void transportCloseWithErrorTriggersStopInHubConnection() throws Exception {
MockTransport mockTransport = new MockTransport(true);
MockTransport mockTransport = new MockTransport();
AtomicReference<String> message = new AtomicReference<>();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
String errorMessage = "Example transport error.";
@ -63,7 +63,7 @@ class HubConnectionTest {
@Test
public void checkHubConnectionStateNoHandShakeResponse() {
MockTransport mockTransport = new MockTransport();
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = HubConnectionBuilder.create("http://example.com")
.withTransport(mockTransport)
.withHttpClient(new TestHttpClient())
@ -79,7 +79,7 @@ class HubConnectionTest {
@Test
public void constructHubConnectionWithHttpConnectionOptions() {
Transport mockTransport = new MockTransport(true);
Transport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
@ -95,7 +95,6 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -106,7 +105,7 @@ class HubConnectionTest {
@Test
public void invalidHandShakeResponse() throws Exception {
MockTransport mockTransport = new MockTransport();
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
@ -118,7 +117,7 @@ class HubConnectionTest {
@Test
public void hubConnectionReceiveHandshakeResponseWithError() {
MockTransport mockTransport = new MockTransport();
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
@ -145,7 +144,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -169,7 +167,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -197,7 +194,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that the handler was removed.
@ -223,7 +219,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
assertEquals(Double.valueOf(3), value.get());
@ -253,7 +248,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -286,7 +280,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -322,7 +315,6 @@ class HubConnectionTest {
assertEquals(expectedHanshakeRequest, message);
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
assertEquals(Double.valueOf(3), value.get());
@ -346,7 +338,6 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
try {
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
@ -371,7 +362,6 @@ class HubConnectionTest {
assertEquals(Double.valueOf(0), value.get());
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -384,7 +374,6 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
@ -403,7 +392,6 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean doneFirst = new AtomicBoolean();
AtomicBoolean doneSecond = new AtomicBoolean();
@ -430,7 +418,6 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
// int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type
@ -450,7 +437,6 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
@ -476,7 +462,6 @@ class HubConnectionTest {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
AtomicBoolean done = new AtomicBoolean();
Single<Integer> result = hubConnection.invoke(int.class, "echo", "message");
@ -508,7 +493,6 @@ class HubConnectionTest {
});
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and that the counter property was incremented.
@ -527,7 +511,6 @@ class HubConnectionTest {
}, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "Hello World");
@ -552,7 +535,6 @@ class HubConnectionTest {
}, String.class, Double.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\", 12]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "Hello World", 12);
@ -581,7 +563,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\"]}" + RECORD_SEPARATOR);
hubConnection.send("inc", "A", "B", "C");
@ -614,7 +595,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\", \"D\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -650,7 +630,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12 ]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -690,7 +669,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -734,7 +712,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -782,7 +759,6 @@ class HubConnectionTest {
}, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class, String.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\",\"F\"]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
assertEquals("A", value1.get());
@ -815,7 +791,6 @@ class HubConnectionTest {
}, Custom.class);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[{\"number\":1,\"str\":\"A\",\"bools\":[true,false]}]}" + RECORD_SEPARATOR);
// Confirming that our handler was called and the correct message was passed in.
@ -830,7 +805,7 @@ class HubConnectionTest {
@Test
public void receiveHandshakeResponseAndMessage() throws Exception {
AtomicReference<Double> value = new AtomicReference<Double>(0.0);
MockTransport mockTransport = new MockTransport();
MockTransport mockTransport = new MockTransport(false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.on("inc", () ->{
@ -898,7 +873,6 @@ class HubConnectionTest {
assertEquals(ex.getMessage(), "There was an error");
});
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
@ -938,7 +912,6 @@ class HubConnectionTest {
}, String.class);
Completable startFuture = hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS);
RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR));
@ -1091,9 +1064,43 @@ class HubConnectionTest {
assertEquals("Bearer newToken", token.get());
}
@Test
public void connectionTimesOutIfServerDoesNotSendMessage() throws Exception {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
hubConnection.setServerTimeout(Duration.ofMillis(1));
hubConnection.setTickRate(Duration.ofMillis(1));
CompletableFuture<Exception> closedFuture = new CompletableFuture<>();
hubConnection.onClosed((e) -> {
closedFuture.complete(e);
});
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
assertEquals("Server timeout elapsed without receiving a message from the server.", closedFuture.get(1000, TimeUnit.MILLISECONDS).getMessage());
}
@Test
public void connectionSendsPingsRegularly() throws InterruptedException, ExecutionException, TimeoutException, Exception {
MockTransport mockTransport = new MockTransport(true, false);
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.setKeepAliveInterval(Duration.ofMillis(1));
hubConnection.setTickRate(Duration.ofMillis(1));
hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(100);
hubConnection.stop();
String[] sentMessages = mockTransport.getSentMessages();
assertTrue(sentMessages.length > 1);
for (int i = 1; i < sentMessages.length; i++) {
assertEquals("{\"type\":6}" + RECORD_SEPARATOR, sentMessages[i]);
}
}
@Test
public void hubConnectionCanBeStartedAfterBeingStopped() throws Exception {
MockTransport transport = new MockTransport(true);
MockTransport transport = new MockTransport();
HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransport(transport)
@ -1112,7 +1119,7 @@ class HubConnectionTest {
@Test
public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() throws Exception {
MockTransport mockTransport = new MockTransport(true);
MockTransport mockTransport = new MockTransport();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate", (req) -> CompletableFuture
.completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}")))

View File

@ -12,15 +12,22 @@ class MockTransport implements Transport {
private ArrayList<String> sentMessages = new ArrayList<>();
private String url;
private Consumer<String> onClose;
private boolean autoHandshake;
final private boolean ignorePings;
final private boolean autoHandshake;
private static final String RECORD_SEPARATOR = "\u001e";
public MockTransport() {
this(true, true);
}
public MockTransport(boolean autoHandshake) {
this(autoHandshake, true);
}
public MockTransport(boolean autoHandshake, boolean ignorePings) {
this.autoHandshake = autoHandshake;
this.ignorePings = ignorePings;
}
@Override
@ -38,7 +45,9 @@ class MockTransport implements Transport {
@Override
public CompletableFuture send(String message) {
sentMessages.add(message);
if (!(ignorePings && message.equals("{\"type\":6}" + RECORD_SEPARATOR))) {
sentMessages.add(message);
}
return CompletableFuture.completedFuture(null);
}