From cdd7387159190d4e3c1755ffc0830d5b5073bb28 Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Fri, 16 Nov 2018 16:29:32 -0800 Subject: [PATCH] Java Client Steaming (#3301) --- .../signalr/CancelInvocationMessage.java | 18 ++ .../com/microsoft/signalr/HubConnection.java | 64 ++++- .../microsoft/signalr/InvocationRequest.java | 21 +- .../microsoft/signalr/JsonHubProtocol.java | 13 +- .../signalr/StreamInvocationMessage.java | 23 +- .../com/microsoft/signalr/StreamItem.java | 28 +++ .../microsoft/signalr/HubConnectionTest.java | 219 ++++++++++++++++++ .../signalr/JsonHubProtocolTest.java | 9 - 8 files changed, 368 insertions(+), 27 deletions(-) create mode 100644 clients/java/signalr/src/main/java/com/microsoft/signalr/CancelInvocationMessage.java create mode 100644 clients/java/signalr/src/main/java/com/microsoft/signalr/StreamItem.java diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/CancelInvocationMessage.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/CancelInvocationMessage.java new file mode 100644 index 0000000000..096c49faf0 --- /dev/null +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/CancelInvocationMessage.java @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.signalr; + +final class CancelInvocationMessage extends HubMessage { + private final int type = HubMessageType.CANCEL_INVOCATION.value; + private final String invocationId; + + public CancelInvocationMessage(String invocationId) { + this.invocationId = invocationId; + } + + @Override + public HubMessageType getMessageType() { + return HubMessageType.CANCEL_INVOCATION; + } +} diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 485137d0a6..5b255ec9bd 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -21,9 +21,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.reactivex.Completable; +import io.reactivex.Observable; import io.reactivex.Single; -import io.reactivex.subjects.CompletableSubject; -import io.reactivex.subjects.SingleSubject; +import io.reactivex.subjects.*; /** * A connection used to invoke hub methods on a SignalR Server. @@ -58,7 +58,6 @@ public class HubConnection { private long handshakeResponseTimeout = 15*1000; private final Logger logger = LoggerFactory.getLogger(HubConnection.class); - /** * Sets the server timeout interval for the connection. * @@ -202,8 +201,17 @@ public class HubConnection { } irq.complete(completionMessage); break; - case STREAM_INVOCATION: case STREAM_ITEM: + StreamItem streamItem = (StreamItem)message; + InvocationRequest streamInvocationRequest = connectionState.getInvocation(streamItem.getInvocationId()); + if (streamInvocationRequest == null) { + logger.warn("Dropped unsolicited Completion message for invocation '{}'.", streamItem.getInvocationId()); + continue; + } + + streamInvocationRequest.addItem(streamItem); + break; + case STREAM_INVOCATION: case CANCEL_INVOCATION: logger.error("This client does not support {} messages.", message.getMessageType()); @@ -481,7 +489,7 @@ public class HubConnection { // forward the invocation result or error to the user // run continuations on a separate thread - Single pendingCall = irq.getPendingCall(); + Subject pendingCall = irq.getPendingCall(); pendingCall.subscribe(result -> { // Primitive types can't be cast with the Class cast function if (returnType.isPrimitive()) { @@ -498,10 +506,54 @@ public class HubConnection { return subject; } + /** + * Invokes a streaming hub method on the server using the specified name and arguments. + * + * @param returnType The expected return type of the stream items. + * @param method The name of the server method to invoke. + * @param args The arguments used to invoke the server method. + * @param The expected return type. + * @return An observable that yields the streaming results from the server. + */ + @SuppressWarnings("unchecked") + public Observable stream(Class returnType, String method, Object ... args) { + String invocationId = connectionState.getNextInvocationId(); + AtomicInteger subscriptionCount = new AtomicInteger(); + StreamInvocationMessage streamInvocationMessage = new StreamInvocationMessage(invocationId, method, args); + InvocationRequest irq = new InvocationRequest(returnType, invocationId); + connectionState.addInvocation(irq); + ReplaySubject subject = ReplaySubject.create(); + + Subject pendingCall = irq.getPendingCall(); + pendingCall.subscribe(result -> { + // Primitive types can't be cast with the Class cast function + if (returnType.isPrimitive()) { + subject.onNext((T)result); + } else { + subject.onNext(returnType.cast(result)); + } + }, error -> subject.onError(error), + () -> subject.onComplete()); + + sendHubMessage(streamInvocationMessage); + Observable observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet()); + + return observable.doOnDispose(() -> { + if (subscriptionCount.decrementAndGet() == 0) { + CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId); + sendHubMessage(cancelInvocationMessage); + connectionState.tryRemoveInvocation(invocationId); + subject.onComplete(); + } + }); + } + private void sendHubMessage(HubMessage message) { String serializedMessage = protocol.writeMessage(message); - if (message.getMessageType() == HubMessageType.INVOCATION) { + if (message.getMessageType() == HubMessageType.INVOCATION ) { logger.debug("Sending {} message '{}'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId()); + } else if (message.getMessageType() == HubMessageType.STREAM_INVOCATION) { + logger.debug("Sending {} message '{}'.", message.getMessageType().name(), ((StreamInvocationMessage)message).getInvocationId()); } else { logger.debug("Sending {} message.", message.getMessageType().name()); } diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java index 3d12d39d04..1c936da663 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationRequest.java @@ -5,12 +5,12 @@ package com.microsoft.signalr; import java.util.concurrent.CancellationException; -import io.reactivex.Single; -import io.reactivex.subjects.SingleSubject; +import io.reactivex.subjects.ReplaySubject; +import io.reactivex.subjects.Subject; class InvocationRequest { private final Class returnType; - private final SingleSubject pendingCall = SingleSubject.create(); + private final Subject pendingCall = ReplaySubject.create(); private final String invocationId; InvocationRequest(Class returnType, String invocationId) { @@ -19,13 +19,22 @@ class InvocationRequest { } public void complete(CompletionMessage completion) { - if (completion.getResult() != null) { - pendingCall.onSuccess(completion.getResult()); + if (completion.getError() == null) { + if (completion.getResult() != null) { + pendingCall.onNext(completion.getResult()); + } + pendingCall.onComplete(); } else { pendingCall.onError(new HubException(completion.getError())); } } + public void addItem(StreamItem streamItem) { + if (streamItem.getResult() != null) { + pendingCall.onNext(streamItem.getResult()); + } + } + public void fail(Exception ex) { pendingCall.onError(ex); } @@ -34,7 +43,7 @@ class InvocationRequest { pendingCall.onError(new CancellationException("Invocation was canceled.")); } - public Single getPendingCall() { + public Subject getPendingCall() { return pendingCall; } diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java index 8ff1024b4b..b0005f8cd7 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java @@ -73,7 +73,7 @@ class JsonHubProtocol implements HubProtocol { error = reader.nextString(); break; case "result": - if (invocationId == null) { + if (invocationId == null || binder.getReturnType(invocationId) == null) { resultToken = jsonParser.parse(reader); } else { result = gson.fromJson(reader, binder.getReturnType(invocationId)); @@ -142,12 +142,19 @@ class JsonHubProtocol implements HubProtocol { break; case COMPLETION: if (resultToken != null) { - result = gson.fromJson(resultToken, binder.getReturnType(invocationId)); + Class returnType = binder.getReturnType(invocationId); + result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class); } hubMessages.add(new CompletionMessage(invocationId, result, error)); break; - case STREAM_INVOCATION: case STREAM_ITEM: + if (resultToken != null) { + Class returnType = binder.getReturnType(invocationId); + result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class); + } + hubMessages.add(new StreamItem(invocationId, result)); + break; + case STREAM_INVOCATION: case CANCEL_INVOCATION: throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", messageType)); case PING: diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java index eb4ae0e479..f897195667 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java @@ -3,11 +3,28 @@ package com.microsoft.signalr; -class StreamInvocationMessage extends InvocationMessage { +final class StreamInvocationMessage extends HubMessage { private final int type = HubMessageType.STREAM_INVOCATION.value; + private final String invocationId; + private final String target; + private final Object[] arguments; - public StreamInvocationMessage(String invocationId, String target, Object[] arguments) { - super(invocationId, target, arguments); + public StreamInvocationMessage(String invocationId, String target, Object[] args) { + this.invocationId = invocationId; + this.target = target; + this.arguments = args; + } + + public String getInvocationId() { + return invocationId; + } + + public String getTarget() { + return target; + } + + public Object[] getArguments() { + return arguments; } @Override diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamItem.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamItem.java new file mode 100644 index 0000000000..e39941d04c --- /dev/null +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamItem.java @@ -0,0 +1,28 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.signalr; + +final class StreamItem extends HubMessage { + private final int type = HubMessageType.STREAM_ITEM.value; + private final String invocationId; + private final Object result; + + public StreamItem(String invocationId, Object result) { + this.invocationId = invocationId; + this.result = result; + } + + public String getInvocationId() { + return invocationId; + } + + public Object getResult() { + return result; + } + + @Override + public HubMessageType getMessageType() { + return HubMessageType.STREAM_ITEM; + } +} diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index 6c29be1c82..cab2ae0ecb 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -5,6 +5,7 @@ package com.microsoft.signalr; import static org.junit.jupiter.api.Assertions.*; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -15,7 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; +import io.reactivex.Observable; import io.reactivex.Single; +import io.reactivex.disposables.Disposable; import io.reactivex.subjects.SingleSubject; class HubConnectionTest { @@ -367,6 +370,222 @@ class HubConnectionTest { assertEquals(Double.valueOf(24), value.get()); } + @Test + public void checkStreamSingleItem() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean completed = new AtomicBoolean(); + AtomicBoolean onNextCalled = new AtomicBoolean(); + Observable result = hubConnection.stream(String.class, "echo", "message"); + result.subscribe((item) -> onNextCalled.set(true), + (error) -> {}, + () -> completed.set(true)); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(completed.get()); + assertFalse(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR); + + assertTrue(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"hello\"}" + RECORD_SEPARATOR); + assertTrue(completed.get()); + + assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst()); + } + + @Test + public void checkStreamCompletionResult() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean completed = new AtomicBoolean(); + AtomicBoolean onNextCalled = new AtomicBoolean(); + Observable result = hubConnection.stream(String.class, "echo", "message"); + result.subscribe((item) -> onNextCalled.set(true), + (error) -> {}, + () -> completed.set(true)); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(completed.get()); + assertFalse(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR); + + assertTrue(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"COMPLETED\"}" + RECORD_SEPARATOR); + assertTrue(completed.get()); + + assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst()); + assertEquals("COMPLETED", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast()); + + } + + @Test + public void checkStreamCompletionError() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean onErrorCalled = new AtomicBoolean(); + AtomicBoolean onNextCalled = new AtomicBoolean(); + Observable result = hubConnection.stream(String.class, "echo", "message"); + result.subscribe((item) -> onNextCalled.set(true), + (error) -> onErrorCalled.set(true), + () -> {}); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(onErrorCalled.get()); + assertFalse(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR); + + assertTrue(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"error\":\"There was an error\"}" + RECORD_SEPARATOR); + assertTrue(onErrorCalled.get()); + + assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst()); + Throwable exception = assertThrows(HubException.class, () -> result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast()); + assertEquals("There was an error", exception.getMessage()); + } + + @Test + public void checkStreamMultipleItems() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean completed = new AtomicBoolean(); + Observable result = hubConnection.stream(String.class, "echo", "message"); + result.subscribe((item) -> {/*OnNext*/ }, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/completed.set(true);}); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(completed.get()); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR); + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"Second\"}" + RECORD_SEPARATOR); + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"null\"}" + RECORD_SEPARATOR); + + Iterator resultIterator = result.timeout(1000, TimeUnit.MILLISECONDS).blockingIterable().iterator(); + assertEquals("First", resultIterator.next()); + assertEquals("Second", resultIterator.next()); + assertTrue(completed.get()); + } + + @Test + public void checkCancelIsSentAfterDispose() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean completed = new AtomicBoolean(); + Observable result = hubConnection.stream(String.class, "echo", "message"); + Disposable subscription = result.subscribe((item) -> {/*OnNext*/ }, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/completed.set(true);}); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(completed.get()); + + subscription.dispose(); + assertEquals("{\"type\":5,\"invocationId\":\"1\"}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[2]); + } + + @Test + public void checkCancelIsSentAfterAllSubscriptionsAreDisposed() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + Observable result = hubConnection.stream(String.class, "echo", "message"); + Disposable subscription = result.subscribe((item) -> {/*OnNext*/ }, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/}); + + Disposable secondSubscription = result.subscribe((item) -> {/*OnNext*/ }, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/}); + + subscription.dispose(); + assertEquals(2, mockTransport.getSentMessages().length); + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, + mockTransport.getSentMessages()[mockTransport.getSentMessages().length - 1]); + + secondSubscription.dispose(); + assertEquals(3, mockTransport.getSentMessages().length); + assertEquals("{\"type\":5,\"invocationId\":\"1\"}" + RECORD_SEPARATOR, + mockTransport.getSentMessages()[mockTransport.getSentMessages().length - 1]); + } + + @Test + public void checkStreamWithDispose() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + Observable result = hubConnection.stream(String.class, "echo", "message"); + Disposable subscription = result.subscribe((item) -> {/*OnNext*/}, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/}); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR); + + subscription.dispose(); + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"Second\"}" + RECORD_SEPARATOR); + + assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast()); + } + + @Test + public void checkStreamWithDisposeWithMultipleSubscriptions() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean completed = new AtomicBoolean(); + Observable result = hubConnection.stream(String.class, "echo", "message"); + Disposable subscription = result.subscribe((item) -> {/*OnNext*/}, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/}); + + Disposable subscription2 = result.subscribe((item) -> {/*OnNext*/}, + (error) -> {/*OnError*/}, + () -> {/*OnCompleted*/completed.set(true);}); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(completed.get()); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR); + + subscription.dispose(); + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"Second\"}" + RECORD_SEPARATOR); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\"}" + RECORD_SEPARATOR); + assertTrue(completed.get()); + assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst()); + + subscription2.dispose(); + assertEquals("Second", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast()); + } + @Test public void invokeWaitsForCompletionMessage() { MockTransport mockTransport = new MockTransport(); diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/JsonHubProtocolTest.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/JsonHubProtocolTest.java index 8a8ad9f3cd..f4df7a24b6 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/JsonHubProtocolTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/JsonHubProtocolTest.java @@ -108,15 +108,6 @@ class JsonHubProtocolTest { assertEquals(42, messageResult); } - @Test - public void parseSingleUnsupportedStreamItemMessage() { - String stringifiedMessage = "{\"type\":2,\"Id\":1,\"Item\":42}\u001E"; - TestBinder binder = new TestBinder(null); - - Throwable exception = assertThrows(UnsupportedOperationException.class, () -> jsonHubProtocol.parseMessages(stringifiedMessage, binder)); - assertEquals("The message type STREAM_ITEM is not supported yet.", exception.getMessage()); - } - @Test public void parseSingleUnsupportedStreamInvocationMessage() { String stringifiedMessage = "{\"type\":4,\"Id\":1,\"target\":\"test\",\"arguments\":[42]}\u001E";