Clear internal stream map on connection/stream close. (#12212)

This commit is contained in:
Mikael Mengistu 2019-07-17 17:32:31 -07:00 committed by GitHub
parent c8b6bb3f41
commit 7d08882031
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 3 deletions

View File

@ -109,6 +109,11 @@ public class HubConnection {
this.tickRate = tickRateInMilliseconds;
}
// For testing purposes
Map<String,Observable> getStreamMap() {
return this.streamMap;
}
TransportEnum getTransportEnum() {
return this.transportEnum;
}
@ -517,6 +522,7 @@ public class HubConnection {
connectionId = null;
transportEnum = TransportEnum.ALL;
this.localHeaders.clear();
this.streamMap.clear();
} finally {
hubConnectionStateLock.unlock();
}
@ -575,8 +581,14 @@ public class HubConnection {
Observable observable = this.streamMap.get(streamId);
observable.subscribe(
(item) -> sendHubMessage(new StreamItem(streamId, item)),
(error) -> sendHubMessage(new CompletionMessage(streamId, null, error.toString())),
() -> sendHubMessage(new CompletionMessage(streamId, null, null)));
(error) -> {
sendHubMessage(new CompletionMessage(streamId, null, error.toString()));
this.streamMap.remove(streamId);
},
() -> {
sendHubMessage(new CompletionMessage(streamId, null, null));
this.streamMap.remove(streamId);
});
}
}
@ -599,7 +611,6 @@ public class HubConnection {
return params.toArray();
}
/**
* Invokes a hub method on the server using the specified method name and arguments.
*

View File

@ -515,6 +515,74 @@ class HubConnectionTest {
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]);
}
@Test
public void streamMapIsClearedOnClose() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream, 12);
stream.onNext("FirstItem");
String[] messages = mockTransport.getSentMessages();
assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]);
assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]);
stream.onComplete();
messages = mockTransport.getSentMessages();
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]);
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertTrue(hubConnection.getStreamMap().isEmpty());
}
@Test
public void streamMapEntriesRemovedOnStreamClose() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream, 12);
ReplaySubject<String> secondStream = ReplaySubject.create();
hubConnection.send("SecondUploadStream", secondStream, 13);
stream.onNext("FirstItem");
secondStream.onNext("SecondItem");
String[] messages = mockTransport.getSentMessages();
assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]);
assertEquals("{\"type\":1,\"target\":\"SecondUploadStream\",\"arguments\":[13],\"streamIds\":[\"2\"]}\u001E", messages[2]);
assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[3]);
assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"SecondItem\"}\u001E", messages[4]);
assertEquals(2, hubConnection.getStreamMap().size());
assertTrue(hubConnection.getStreamMap().keySet().contains("1"));
assertTrue(hubConnection.getStreamMap().keySet().contains("2"));
// Verify that we clear the entry from the stream map after we clear the first stream.
stream.onComplete();
assertEquals(1, hubConnection.getStreamMap().size());
assertTrue(hubConnection.getStreamMap().keySet().contains("2"));
secondStream.onError(new Exception("Exception"));
assertEquals(0, hubConnection.getStreamMap().size());
assertTrue(hubConnection.getStreamMap().isEmpty());
messages = mockTransport.getSentMessages();
assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[5]);
assertEquals("{\"type\":3,\"invocationId\":\"2\",\"error\":\"java.lang.Exception: Exception\"}\u001E", messages[6]);
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
assertTrue(hubConnection.getStreamMap().isEmpty());
}
@Test
public void useSameSubjectMultipleTimes() {
MockTransport mockTransport = new MockTransport();

View File

@ -12,6 +12,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
class WebSocketTransportUrlFormatTest {
private static Stream<Arguments> protocols() {
return Stream.of(