diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index a4d1e58c7d..aa333fb508 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -109,6 +109,11 @@ public class HubConnection { this.tickRate = tickRateInMilliseconds; } + // For testing purposes + Map getStreamMap() { + return this.streamMap; + } + TransportEnum getTransportEnum() { return this.transportEnum; } @@ -517,6 +522,7 @@ public class HubConnection { connectionId = null; transportEnum = TransportEnum.ALL; this.localHeaders.clear(); + this.streamMap.clear(); } finally { hubConnectionStateLock.unlock(); } @@ -575,8 +581,14 @@ public class HubConnection { Observable observable = this.streamMap.get(streamId); observable.subscribe( (item) -> sendHubMessage(new StreamItem(streamId, item)), - (error) -> sendHubMessage(new CompletionMessage(streamId, null, error.toString())), - () -> sendHubMessage(new CompletionMessage(streamId, null, null))); + (error) -> { + sendHubMessage(new CompletionMessage(streamId, null, error.toString())); + this.streamMap.remove(streamId); + }, + () -> { + sendHubMessage(new CompletionMessage(streamId, null, null)); + this.streamMap.remove(streamId); + }); } } @@ -599,7 +611,6 @@ public class HubConnection { return params.toArray(); } - /** * Invokes a hub method on the server using the specified method name and arguments. * diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index 0ff3be7fd0..f28adf13e7 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -515,6 +515,74 @@ class HubConnectionTest { assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]); } + @Test + public void streamMapIsClearedOnClose() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream, 12); + + stream.onNext("FirstItem"); + String[] messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]); + + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + assertTrue(hubConnection.getStreamMap().isEmpty()); + } + + @Test + public void streamMapEntriesRemovedOnStreamClose() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream, 12); + + ReplaySubject secondStream = ReplaySubject.create(); + hubConnection.send("SecondUploadStream", secondStream, 13); + + + stream.onNext("FirstItem"); + secondStream.onNext("SecondItem"); + String[] messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]); + assertEquals("{\"type\":1,\"target\":\"SecondUploadStream\",\"arguments\":[13],\"streamIds\":[\"2\"]}\u001E", messages[2]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[3]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"SecondItem\"}\u001E", messages[4]); + + + assertEquals(2, hubConnection.getStreamMap().size()); + assertTrue(hubConnection.getStreamMap().keySet().contains("1")); + assertTrue(hubConnection.getStreamMap().keySet().contains("2")); + + // Verify that we clear the entry from the stream map after we clear the first stream. + stream.onComplete(); + assertEquals(1, hubConnection.getStreamMap().size()); + assertTrue(hubConnection.getStreamMap().keySet().contains("2")); + + secondStream.onError(new Exception("Exception")); + assertEquals(0, hubConnection.getStreamMap().size()); + assertTrue(hubConnection.getStreamMap().isEmpty()); + + messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[5]); + assertEquals("{\"type\":3,\"invocationId\":\"2\",\"error\":\"java.lang.Exception: Exception\"}\u001E", messages[6]); + + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertTrue(hubConnection.getStreamMap().isEmpty()); + } + @Test public void useSameSubjectMultipleTimes() { MockTransport mockTransport = new MockTransport(); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java index 1dbe653dcc..613ffab959 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportUrlFormatTest.java @@ -12,6 +12,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; + class WebSocketTransportUrlFormatTest { private static Stream protocols() { return Stream.of(