From c987ce5b3c6590cf4b2ddd7516b3c393702316cc Mon Sep 17 00:00:00 2001 From: Mikael Mengistu Date: Thu, 20 Jun 2019 12:54:22 -0700 Subject: [PATCH] Add Invoke API with with no completion result (#11399) --- .../microsoft/signalr/CompletionMessage.java | 4 +- .../com/microsoft/signalr/HubConnection.java | 34 +++++- .../microsoft/signalr/HubConnectionTest.java | 106 ++++++++++++++++-- 3 files changed, 133 insertions(+), 11 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/CompletionMessage.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/CompletionMessage.java index 15d85d63d7..4cd5f68263 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/CompletionMessage.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/CompletionMessage.java @@ -11,7 +11,7 @@ final class CompletionMessage extends HubMessage { public CompletionMessage(String invocationId, Object result, String error) { if (error != null && result != null) { - throw new IllegalArgumentException("Expected either 'error' or 'result' to be provided, but not both"); + throw new IllegalArgumentException("Expected either 'error' or 'result' to be provided, but not both."); } this.invocationId = invocationId; this.result = result; @@ -34,4 +34,4 @@ final class CompletionMessage extends HubMessage { public HubMessageType getMessageType() { return HubMessageType.values()[type - 1]; } -} \ No newline at end of file +} diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index b3b564b711..67669576e7 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -591,10 +591,42 @@ public class HubConnection { this.streamMap.put(streamId, stream); } } - + return params.toArray(); } + + /** + * Invokes a hub method on the server using the specified method name and arguments. + * + * @param method The name of the server method to invoke. + * @param args The arguments used to invoke the server method. + * @return A Completable that indicates when the invocation has completed. + */ + @SuppressWarnings("unchecked") + public Completable invoke(String method, Object... args) { + if (hubConnectionState != HubConnectionState.CONNECTED) { + throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active."); + } + + String id = connectionState.getNextInvocationId(); + + CompletableSubject subject = CompletableSubject.create(); + InvocationRequest irq = new InvocationRequest(null, id); + connectionState.addInvocation(irq); + + Subject pendingCall = irq.getPendingCall(); + + pendingCall.subscribe(result -> subject.onComplete(), + error -> subject.onError(error), + () -> subject.onComplete()); + + // Make sure the actual send is after setting up the callbacks otherwise there is a race + // where the map doesn't have the callbacks yet when the response is returned + sendInvocationMessage(method, args, id, false); + return subject; + } + /** * Invokes a hub method on the server using the specified method name and arguments. * diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index b85b502fb3..b5104919ab 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; +import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.Single; import io.reactivex.disposables.Disposable; @@ -163,7 +164,6 @@ class HubConnectionTest { assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); assertEquals("http://example.com", hubConnection.getBaseUrl()); - Throwable exception = assertThrows(IllegalStateException.class, () -> hubConnection.setBaseUrl("http://newurl.com")); assertEquals("The HubConnection must be in the disconnected state to change the url.",exception.getMessage()); } @@ -946,15 +946,100 @@ class HubConnectionTest { AtomicBoolean done = new AtomicBoolean(); Single result = hubConnection.invoke(Integer.class, "echo", "message"); - result.doOnSuccess(value -> done.set(true)); + result.doOnSuccess(value -> done.set(true)).subscribe(); assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(done.get()); } - + + @Test + public void invokeNoReturnValueWaitsForCompletion() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean done = new AtomicBoolean(); + Completable result = hubConnection.invoke("test", "message"); + result.doOnComplete(() -> done.set(true)).subscribe(); + + assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"test\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(done.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\"}" + RECORD_SEPARATOR); + + assertNull(result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(done.get()); + } + + @Test + public void invokeCompletedByCompletionMessageWithResult() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean done = new AtomicBoolean(); + Completable result = hubConnection.invoke("test", "message"); + result.doOnComplete(() -> done.set(true)).subscribe(); + + assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"test\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(done.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); + + assertNull(result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(done.get()); + } + + @Test + public void completionWithResultAndErrorHandlesError() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean done = new AtomicBoolean(); + Completable result = hubConnection.invoke("test", "message"); + result.doOnComplete(() -> done.set(true)).subscribe(() -> {}, (error) -> {}); + + assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"test\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(done.get()); + + Throwable exception = assertThrows(IllegalArgumentException.class, () -> mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42,\"error\":\"There was an error\"}" + RECORD_SEPARATOR)); + assertEquals("Expected either 'error' or 'result' to be provided, but not both.", exception.getMessage()); + } + + @Test + public void invokeNoReturnValueHandlesError() { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean done = new AtomicBoolean(); + Completable result = hubConnection.invoke("test", "message"); + result.doOnComplete(() -> done.set(true)).subscribe(() -> {}, (error) -> {}); + + assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"test\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); + assertFalse(done.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"error\":\"There was an error\"}" + RECORD_SEPARATOR); + + result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet(); + + AtomicReference errorMessage = new AtomicReference<>(); + result.doOnError(error -> { + errorMessage.set(error.getMessage()); + }).subscribe(() -> {}, (error) -> {}); + + assertEquals("There was an error", errorMessage.get()); + } + @Test public void canSendNullArgInInvocation() { MockTransport mockTransport = new MockTransport(); @@ -964,13 +1049,14 @@ class HubConnectionTest { AtomicBoolean done = new AtomicBoolean(); Single result = hubConnection.invoke(String.class, "fixedMessage", null); - result.doOnSuccess(value -> done.set(true)); + result.doOnSuccess(value -> done.set(true)).subscribe(); assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"fixedMessage\",\"arguments\":[null]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"Hello World\"}" + RECORD_SEPARATOR); assertEquals("Hello World", result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(done.get()); } @Test @@ -982,13 +1068,14 @@ class HubConnectionTest { AtomicBoolean done = new AtomicBoolean(); Single result = hubConnection.invoke(String.class, "fixedMessage", null, null); - result.doOnSuccess(value -> done.set(true)); + result.doOnSuccess(value -> done.set(true)).subscribe(); assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"fixedMessage\",\"arguments\":[null,null]}"+ RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"Hello World\"}" + RECORD_SEPARATOR); assertEquals("Hello World", result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(done.get()); } @Test @@ -1002,8 +1089,8 @@ class HubConnectionTest { AtomicBoolean doneSecond = new AtomicBoolean(); Single result = hubConnection.invoke(Integer.class, "echo", "message"); Single result2 = hubConnection.invoke(String.class, "echo", "message"); - result.doOnSuccess(value -> doneFirst.set(true)); - result2.doOnSuccess(value -> doneSecond.set(true)); + result.doOnSuccess(value -> doneFirst.set(true)).subscribe(); + result2.doOnSuccess(value -> doneSecond.set(true)).subscribe(); assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]); assertEquals("{\"type\":1,\"invocationId\":\"2\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[2]); assertFalse(doneFirst.get()); @@ -1012,9 +1099,11 @@ class HubConnectionTest { mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"2\",\"result\":\"message\"}" + RECORD_SEPARATOR); assertEquals("message", result2.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); assertFalse(doneFirst.get()); + assertTrue(doneSecond.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(doneFirst.get()); } @Test @@ -1028,12 +1117,13 @@ class HubConnectionTest { // int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type // which does not work for primitives we have to write special logic for that case. Single result = hubConnection.invoke(int.class, "echo", "message"); - result.doOnSuccess(value -> done.set(true)); + result.doOnSuccess(value -> done.set(true)).subscribe(); assertFalse(done.get()); mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR); assertEquals(Integer.valueOf(42), result.timeout(1000, TimeUnit.MILLISECONDS).blockingGet()); + assertTrue(done.get()); } @Test