From b1f828e161c32d2c16ffcf487eb3aecbc72576d5 Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Thu, 7 Mar 2019 09:34:28 -0800 Subject: [PATCH] Client to Sever Streaming Java Edition (#8222) --- .../com/microsoft/signalr/HubConnection.java | 60 +++- .../microsoft/signalr/InvocationMessage.java | 12 +- .../signalr/StreamInvocationMessage.java | 26 +- .../microsoft/signalr/HubConnectionTest.java | 277 +++++++++++++++++- 4 files changed, 346 insertions(+), 29 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 391ad158a9..faf8ea2b27 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -3,6 +3,7 @@ package com.microsoft.signalr; +import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -49,6 +50,7 @@ public class HubConnection { private long tickRate = 1000; private CompletableSubject handshakeResponseSubject; private long handshakeResponseTimeout = 15*1000; + private Map streamMap = new ConcurrentHashMap<>(); private TransportEnum transportEnum = TransportEnum.ALL; private final Logger logger = LoggerFactory.getLogger(HubConnection.class); @@ -495,8 +497,54 @@ public class HubConnection { throw new RuntimeException("The 'send' method cannot be called if the connection is not active."); } - InvocationMessage invocationMessage = new InvocationMessage(null, method, args); + sendInvocationMessage(method, args); + } + + private void sendInvocationMessage(String method, Object[] args) { + sendInvocationMessage(method, args, null, false); + } + + private void sendInvocationMessage(String method, Object[] args, String id, Boolean isStreamInvocation) { + List streamIds = new ArrayList<>(); + args = checkUploadStream(args, streamIds); + InvocationMessage invocationMessage; + if (isStreamInvocation) { + invocationMessage = new StreamInvocationMessage(id, method, args, streamIds); + } else { + invocationMessage = new InvocationMessage(id, method, args, streamIds); + } + sendHubMessage(invocationMessage); + launchStreams(streamIds); + } + + void launchStreams(List streamIds) { + if (streamMap.isEmpty()) { + return; + } + + for (String streamId: streamIds) { + Observable observable = this.streamMap.get(streamId); + observable.subscribe( + (item) -> sendHubMessage(new StreamItem(streamId, item)), + (error) -> sendHubMessage(new CompletionMessage(streamId, null, error.toString())), + () -> sendHubMessage(new CompletionMessage(streamId, null, null))); + } + } + + Object[] checkUploadStream(Object[] args, List streamIds) { + List params = new ArrayList<>(Arrays.asList(args)); + for (Object arg: args) { + if(arg instanceof Observable) { + params.remove(arg); + Observable stream = (Observable)arg; + String streamId = connectionState.getNextInvocationId(); + streamIds.add(streamId); + this.streamMap.put(streamId, stream); + } + } + + return params.toArray(); } /** @@ -515,7 +563,6 @@ public class HubConnection { } String id = connectionState.getNextInvocationId(); - InvocationMessage invocationMessage = new InvocationMessage(id, method, args); SingleSubject subject = SingleSubject.create(); InvocationRequest irq = new InvocationRequest(returnType, id); @@ -535,8 +582,7 @@ public class HubConnection { // Make sure the actual send is after setting up the callbacks otherwise there is a race // where the map doesn't have the callbacks yet when the response is returned - sendHubMessage(invocationMessage); - + sendInvocationMessage(method, args, id, false); return subject; } @@ -553,7 +599,6 @@ public class HubConnection { public Observable stream(Class returnType, String method, Object ... args) { String invocationId = connectionState.getNextInvocationId(); AtomicInteger subscriptionCount = new AtomicInteger(); - StreamInvocationMessage streamInvocationMessage = new StreamInvocationMessage(invocationId, method, args); InvocationRequest irq = new InvocationRequest(returnType, invocationId); connectionState.addInvocation(irq); ReplaySubject subject = ReplaySubject.create(); @@ -569,9 +614,8 @@ public class HubConnection { }, error -> subject.onError(error), () -> subject.onComplete()); - sendHubMessage(streamInvocationMessage); Observable observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet()); - + sendInvocationMessage(method, args, invocationId, true); return observable.doOnDispose(() -> { if (subscriptionCount.decrementAndGet() == 0) { CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId); @@ -591,8 +635,8 @@ public class HubConnection { } else { logger.debug("Sending {} message.", message.getMessageType().name()); } - transport.send(serializedMessage).subscribeWith(CompletableSubject.create()); + transport.send(serializedMessage).subscribeWith(CompletableSubject.create()); resetKeepAlive(); } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationMessage.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationMessage.java index 8d728cfaba..d62c1d4a97 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationMessage.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationMessage.java @@ -3,16 +3,26 @@ package com.microsoft.signalr; +import java.util.Collection; + class InvocationMessage extends HubMessage { - private final int type = HubMessageType.INVOCATION.value; + int type = HubMessageType.INVOCATION.value; private final String invocationId; private final String target; private final Object[] arguments; + private Collection streamIds; public InvocationMessage(String invocationId, String target, Object[] args) { + this(invocationId, target, args, null); + } + + public InvocationMessage(String invocationId, String target, Object[] args, Collection streamIds) { this.invocationId = invocationId; this.target = target; this.arguments = args; + if(streamIds != null && !streamIds.isEmpty()) { + this.streamIds = streamIds; + } } public String getInvocationId() { diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java index f897195667..046ec60036 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java @@ -3,28 +3,18 @@ package com.microsoft.signalr; -final class StreamInvocationMessage extends HubMessage { - private final int type = HubMessageType.STREAM_INVOCATION.value; - private final String invocationId; - private final String target; - private final Object[] arguments; +import java.util.Collection; + +final class StreamInvocationMessage extends InvocationMessage { public StreamInvocationMessage(String invocationId, String target, Object[] args) { - this.invocationId = invocationId; - this.target = target; - this.arguments = args; + super(invocationId, target, args); + super.type = HubMessageType.STREAM_INVOCATION.value; } - public String getInvocationId() { - return invocationId; - } - - public String getTarget() { - return target; - } - - public Object[] getArguments() { - return arguments; + public StreamInvocationMessage(String invocationId, String target, Object[] args, Collection streamIds) { + super(invocationId, target, args, streamIds); + super.type = HubMessageType.STREAM_INVOCATION.value; } @Override diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index f576c1c828..f316e10329 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -14,7 +14,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.subjects.CompletableSubject; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subjects.ReplaySubject; import org.junit.jupiter.api.Test; import io.reactivex.Observable; @@ -371,6 +372,279 @@ class HubConnectionTest { assertEquals(Double.valueOf(24), value.get()); } + @Test + public void checkStreamUploadSingleItemThroughSend() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream); + + stream.onNext("FirstItem"); + String[] messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]); + } + + @Test + public void checkStreamUploadMultipleStreamsThroughSend() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject firstStream = ReplaySubject.create(); + ReplaySubject secondStream = ReplaySubject.create(); + + hubConnection.send("UploadStream", firstStream, secondStream); + + firstStream.onNext("First Stream 1"); + secondStream.onNext("Second Stream 1"); + String[] messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"First Stream 1\"}\u001E", messages[2]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"Second Stream 1\"}\u001E", messages[3]); + + firstStream.onComplete(); + secondStream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(6, messages.length); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[4]); + assertEquals("{\"type\":3,\"invocationId\":\"2\"}\u001E", messages[5]); + } + + @Test + public void checkStreamUploadThroughSendWithArgs() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream, 12); + + stream.onNext("FirstItem"); + String[] messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[12],\"streamIds\":[\"1\"]}\u001E", messages[1]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[3]); + } + + @Test + public void useSameSubjectMultipleTimes() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream, stream); + + stream.onNext("FirstItem"); + String[] messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[],\"streamIds\":[\"1\",\"2\"]}\u001E", messages[1]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"FirstItem\"}\u001E", messages[3]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(6, messages.length); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[4]); + assertEquals("{\"type\":3,\"invocationId\":\"2\"}\u001E", messages[5]); + } + + @Test + public void checkStreamUploadSingleItemThroughInvoke() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.invoke(String.class, "UploadStream", stream); + + stream.onNext("FirstItem"); + String[] messages = mockTransport.getSentMessages(); + assertEquals(3, messages.length); + assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"UploadStream\",\"arguments\":[],\"streamIds\":[\"2\"]}\u001E", messages[1]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"FirstItem\"}\u001E", messages[2]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals("{\"type\":3,\"invocationId\":\"2\"}\u001E", messages[3]); + } + + @Test + public void checkStreamUploadSingleItemThroughStream() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.stream(String.class, "UploadStream", stream); + + stream.onNext("FirstItem"); + + String[] messages = mockTransport.getSentMessages(); + assertEquals(3, messages.length); + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"UploadStream\",\"arguments\":[],\"streamIds\":[\"2\"]}\u001E", messages[1]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"FirstItem\"}\u001E", messages[2]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + assertEquals("{\"type\":3,\"invocationId\":\"2\"}\u001E", messages[3]); + } + + @Test + public void useSameSubjectInMutlipleStreamsFromDifferentMethods() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream); + hubConnection.invoke(String.class, "UploadStream", stream); + hubConnection.stream(String.class, "UploadStream", stream); + + String[] messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + assertEquals("{\"type\":1,\"target\":\"UploadStream\",\"arguments\":[],\"streamIds\":[\"1\"]}\u001E", messages[1]); + assertEquals("{\"type\":1,\"invocationId\":\"2\",\"target\":\"UploadStream\",\"arguments\":[],\"streamIds\":[\"3\"]}\u001E", messages[2]); + assertEquals("{\"type\":4,\"invocationId\":\"4\",\"target\":\"UploadStream\",\"arguments\":[],\"streamIds\":[\"5\"]}\u001E", messages[3]); + + stream.onNext("FirstItem"); + + messages = mockTransport.getSentMessages(); + assertEquals(7, messages.length); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[4]); + assertEquals("{\"type\":2,\"invocationId\":\"3\",\"item\":\"FirstItem\"}\u001E", messages[5]); + assertEquals("{\"type\":2,\"invocationId\":\"5\",\"item\":\"FirstItem\"}\u001E", messages[6]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(10, messages.length); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[7]); + assertEquals("{\"type\":3,\"invocationId\":\"3\"}\u001E", messages[8]); + assertEquals("{\"type\":3,\"invocationId\":\"5\"}\u001E", messages[9]); + } + + @Test + public void streamUploadCallOnError() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream); + + stream.onNext("FirstItem"); + stream.onError(new RuntimeException("onError called")); + String[] messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]); + assertEquals("{\"type\":3,\"invocationId\":\"1\",\"error\":\"java.lang.RuntimeException: onError called\"}\u001E", messages[3]); + + // onComplete doesn't send a completion message after onError. + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + } + + @Test + public void checkStreamUploadMultipleItemsThroughSend() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.send("UploadStream", stream); + + stream.onNext("FirstItem"); + stream.onNext("SecondItem"); + stream.onNext("ThirdItem"); + + String[] messages = mockTransport.getSentMessages(); + assertEquals(5, messages.length); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"FirstItem\"}\u001E", messages[2]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"SecondItem\"}\u001E", messages[3]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"ThirdItem\"}\u001E", messages[4]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(6, messages.length); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[5]); + } + + @Test + public void checkStreamUploadMultipleItemsThroughInvoke() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + ReplaySubject stream = ReplaySubject.create(); + hubConnection.invoke(String.class, "UploadStream", stream); + + stream.onNext("FirstItem"); + stream.onNext("SecondItem"); + + String[] messages = mockTransport.getSentMessages(); + assertEquals(4, messages.length); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"FirstItem\"}\u001E", messages[2]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"SecondItem\"}\u001E", messages[3]); + + stream.onComplete(); + messages = mockTransport.getSentMessages(); + assertEquals(5, messages.length); + assertEquals("{\"type\":3,\"invocationId\":\"2\"}\u001E", messages[4]); + } + + @Test + public void canStartAndStopMultipleStreams() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + PublishSubject streamOne = PublishSubject.create(); + PublishSubject streamTwo = PublishSubject.create(); + + hubConnection.send("UploadStream", streamOne); + hubConnection.send("UploadStream", streamTwo); + + streamOne.onNext("Stream One First Item"); + streamTwo.onNext("Stream Two First Item"); + streamOne.onNext("Stream One Second Item"); + streamTwo.onNext("Stream Two Second Item"); + + streamOne.onComplete(); + streamTwo.onComplete(); + String[] messages = mockTransport.getSentMessages(); + + // Handshake message + 2 calls to send + 4 calls to onNext + 2 calls to onComplete = 9 + assertEquals(9, messages.length); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"Stream One First Item\"}\u001E", messages[3]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"Stream Two First Item\"}\u001E", messages[4]); + assertEquals("{\"type\":2,\"invocationId\":\"1\",\"item\":\"Stream One Second Item\"}\u001E", messages[5]); + assertEquals("{\"type\":2,\"invocationId\":\"2\",\"item\":\"Stream Two Second Item\"}\u001E", messages[6]); + assertEquals("{\"type\":3,\"invocationId\":\"1\"}\u001E", messages[7]); + assertEquals("{\"type\":3,\"invocationId\":\"2\"}\u001E", messages[8]); + } + @Test public void checkStreamSingleItem() { MockTransport mockTransport = new MockTransport(); @@ -426,7 +700,6 @@ class HubConnectionTest { assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst()); assertEquals("COMPLETED", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast()); - } @Test