Java Client Steaming (#3301)

This commit is contained in:
Mikael Mengistu 2018-11-16 16:29:32 -08:00 committed by GitHub
parent 7015fb20f2
commit cdd7387159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 368 additions and 27 deletions

View File

@ -0,0 +1,18 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.signalr;
final class CancelInvocationMessage extends HubMessage {
private final int type = HubMessageType.CANCEL_INVOCATION.value;
private final String invocationId;
public CancelInvocationMessage(String invocationId) {
this.invocationId = invocationId;
}
@Override
public HubMessageType getMessageType() {
return HubMessageType.CANCEL_INVOCATION;
}
}

View File

@ -21,9 +21,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.subjects.CompletableSubject;
import io.reactivex.subjects.SingleSubject;
import io.reactivex.subjects.*;
/**
* A connection used to invoke hub methods on a SignalR Server.
@ -58,7 +58,6 @@ public class HubConnection {
private long handshakeResponseTimeout = 15*1000;
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
/**
* Sets the server timeout interval for the connection.
*
@ -202,8 +201,17 @@ public class HubConnection {
}
irq.complete(completionMessage);
break;
case STREAM_INVOCATION:
case STREAM_ITEM:
StreamItem streamItem = (StreamItem)message;
InvocationRequest streamInvocationRequest = connectionState.getInvocation(streamItem.getInvocationId());
if (streamInvocationRequest == null) {
logger.warn("Dropped unsolicited Completion message for invocation '{}'.", streamItem.getInvocationId());
continue;
}
streamInvocationRequest.addItem(streamItem);
break;
case STREAM_INVOCATION:
case CANCEL_INVOCATION:
logger.error("This client does not support {} messages.", message.getMessageType());
@ -481,7 +489,7 @@ public class HubConnection {
// forward the invocation result or error to the user
// run continuations on a separate thread
Single<Object> pendingCall = irq.getPendingCall();
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
@ -498,10 +506,54 @@ public class HubConnection {
return subject;
}
/**
* Invokes a streaming hub method on the server using the specified name and arguments.
*
* @param returnType The expected return type of the stream items.
* @param method The name of the server method to invoke.
* @param args The arguments used to invoke the server method.
* @param <T> The expected return type.
* @return An observable that yields the streaming results from the server.
*/
@SuppressWarnings("unchecked")
public <T> Observable<T> stream(Class<T> returnType, String method, Object ... args) {
String invocationId = connectionState.getNextInvocationId();
AtomicInteger subscriptionCount = new AtomicInteger();
StreamInvocationMessage streamInvocationMessage = new StreamInvocationMessage(invocationId, method, args);
InvocationRequest irq = new InvocationRequest(returnType, invocationId);
connectionState.addInvocation(irq);
ReplaySubject<T> subject = ReplaySubject.create();
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
subject.onNext((T)result);
} else {
subject.onNext(returnType.cast(result));
}
}, error -> subject.onError(error),
() -> subject.onComplete());
sendHubMessage(streamInvocationMessage);
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet());
return observable.doOnDispose(() -> {
if (subscriptionCount.decrementAndGet() == 0) {
CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId);
sendHubMessage(cancelInvocationMessage);
connectionState.tryRemoveInvocation(invocationId);
subject.onComplete();
}
});
}
private void sendHubMessage(HubMessage message) {
String serializedMessage = protocol.writeMessage(message);
if (message.getMessageType() == HubMessageType.INVOCATION) {
if (message.getMessageType() == HubMessageType.INVOCATION ) {
logger.debug("Sending {} message '{}'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId());
} else if (message.getMessageType() == HubMessageType.STREAM_INVOCATION) {
logger.debug("Sending {} message '{}'.", message.getMessageType().name(), ((StreamInvocationMessage)message).getInvocationId());
} else {
logger.debug("Sending {} message.", message.getMessageType().name());
}

View File

@ -5,12 +5,12 @@ package com.microsoft.signalr;
import java.util.concurrent.CancellationException;
import io.reactivex.Single;
import io.reactivex.subjects.SingleSubject;
import io.reactivex.subjects.ReplaySubject;
import io.reactivex.subjects.Subject;
class InvocationRequest {
private final Class<?> returnType;
private final SingleSubject<Object> pendingCall = SingleSubject.create();
private final Subject<Object> pendingCall = ReplaySubject.create();
private final String invocationId;
InvocationRequest(Class<?> returnType, String invocationId) {
@ -19,13 +19,22 @@ class InvocationRequest {
}
public void complete(CompletionMessage completion) {
if (completion.getResult() != null) {
pendingCall.onSuccess(completion.getResult());
if (completion.getError() == null) {
if (completion.getResult() != null) {
pendingCall.onNext(completion.getResult());
}
pendingCall.onComplete();
} else {
pendingCall.onError(new HubException(completion.getError()));
}
}
public void addItem(StreamItem streamItem) {
if (streamItem.getResult() != null) {
pendingCall.onNext(streamItem.getResult());
}
}
public void fail(Exception ex) {
pendingCall.onError(ex);
}
@ -34,7 +43,7 @@ class InvocationRequest {
pendingCall.onError(new CancellationException("Invocation was canceled."));
}
public Single<Object> getPendingCall() {
public Subject<Object> getPendingCall() {
return pendingCall;
}

View File

@ -73,7 +73,7 @@ class JsonHubProtocol implements HubProtocol {
error = reader.nextString();
break;
case "result":
if (invocationId == null) {
if (invocationId == null || binder.getReturnType(invocationId) == null) {
resultToken = jsonParser.parse(reader);
} else {
result = gson.fromJson(reader, binder.getReturnType(invocationId));
@ -142,12 +142,19 @@ class JsonHubProtocol implements HubProtocol {
break;
case COMPLETION:
if (resultToken != null) {
result = gson.fromJson(resultToken, binder.getReturnType(invocationId));
Class<?> returnType = binder.getReturnType(invocationId);
result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class);
}
hubMessages.add(new CompletionMessage(invocationId, result, error));
break;
case STREAM_INVOCATION:
case STREAM_ITEM:
if (resultToken != null) {
Class<?> returnType = binder.getReturnType(invocationId);
result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class);
}
hubMessages.add(new StreamItem(invocationId, result));
break;
case STREAM_INVOCATION:
case CANCEL_INVOCATION:
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", messageType));
case PING:

View File

@ -3,11 +3,28 @@
package com.microsoft.signalr;
class StreamInvocationMessage extends InvocationMessage {
final class StreamInvocationMessage extends HubMessage {
private final int type = HubMessageType.STREAM_INVOCATION.value;
private final String invocationId;
private final String target;
private final Object[] arguments;
public StreamInvocationMessage(String invocationId, String target, Object[] arguments) {
super(invocationId, target, arguments);
public StreamInvocationMessage(String invocationId, String target, Object[] args) {
this.invocationId = invocationId;
this.target = target;
this.arguments = args;
}
public String getInvocationId() {
return invocationId;
}
public String getTarget() {
return target;
}
public Object[] getArguments() {
return arguments;
}
@Override

View File

@ -0,0 +1,28 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.signalr;
final class StreamItem extends HubMessage {
private final int type = HubMessageType.STREAM_ITEM.value;
private final String invocationId;
private final Object result;
public StreamItem(String invocationId, Object result) {
this.invocationId = invocationId;
this.result = result;
}
public String getInvocationId() {
return invocationId;
}
public Object getResult() {
return result;
}
@Override
public HubMessageType getMessageType() {
return HubMessageType.STREAM_ITEM;
}
}

View File

@ -5,6 +5,7 @@ package com.microsoft.signalr;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -15,7 +16,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.SingleSubject;
class HubConnectionTest {
@ -367,6 +370,222 @@ class HubConnectionTest {
assertEquals(Double.valueOf(24), value.get());
}
@Test
public void checkStreamSingleItem() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean completed = new AtomicBoolean();
AtomicBoolean onNextCalled = new AtomicBoolean();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
result.subscribe((item) -> onNextCalled.set(true),
(error) -> {},
() -> completed.set(true));
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(completed.get());
assertFalse(onNextCalled.get());
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR);
assertTrue(onNextCalled.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"hello\"}" + RECORD_SEPARATOR);
assertTrue(completed.get());
assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst());
}
@Test
public void checkStreamCompletionResult() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean completed = new AtomicBoolean();
AtomicBoolean onNextCalled = new AtomicBoolean();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
result.subscribe((item) -> onNextCalled.set(true),
(error) -> {},
() -> completed.set(true));
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(completed.get());
assertFalse(onNextCalled.get());
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR);
assertTrue(onNextCalled.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"COMPLETED\"}" + RECORD_SEPARATOR);
assertTrue(completed.get());
assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst());
assertEquals("COMPLETED", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast());
}
@Test
public void checkStreamCompletionError() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean onErrorCalled = new AtomicBoolean();
AtomicBoolean onNextCalled = new AtomicBoolean();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
result.subscribe((item) -> onNextCalled.set(true),
(error) -> onErrorCalled.set(true),
() -> {});
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(onErrorCalled.get());
assertFalse(onNextCalled.get());
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR);
assertTrue(onNextCalled.get());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"error\":\"There was an error\"}" + RECORD_SEPARATOR);
assertTrue(onErrorCalled.get());
assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst());
Throwable exception = assertThrows(HubException.class, () -> result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast());
assertEquals("There was an error", exception.getMessage());
}
@Test
public void checkStreamMultipleItems() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean completed = new AtomicBoolean();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
result.subscribe((item) -> {/*OnNext*/ },
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/completed.set(true);});
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(completed.get());
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"Second\"}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":\"null\"}" + RECORD_SEPARATOR);
Iterator<String> resultIterator = result.timeout(1000, TimeUnit.MILLISECONDS).blockingIterable().iterator();
assertEquals("First", resultIterator.next());
assertEquals("Second", resultIterator.next());
assertTrue(completed.get());
}
@Test
public void checkCancelIsSentAfterDispose() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean completed = new AtomicBoolean();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
Disposable subscription = result.subscribe((item) -> {/*OnNext*/ },
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/completed.set(true);});
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(completed.get());
subscription.dispose();
assertEquals("{\"type\":5,\"invocationId\":\"1\"}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[2]);
}
@Test
public void checkCancelIsSentAfterAllSubscriptionsAreDisposed() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
Disposable subscription = result.subscribe((item) -> {/*OnNext*/ },
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/});
Disposable secondSubscription = result.subscribe((item) -> {/*OnNext*/ },
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/});
subscription.dispose();
assertEquals(2, mockTransport.getSentMessages().length);
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR,
mockTransport.getSentMessages()[mockTransport.getSentMessages().length - 1]);
secondSubscription.dispose();
assertEquals(3, mockTransport.getSentMessages().length);
assertEquals("{\"type\":5,\"invocationId\":\"1\"}" + RECORD_SEPARATOR,
mockTransport.getSentMessages()[mockTransport.getSentMessages().length - 1]);
}
@Test
public void checkStreamWithDispose() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
Disposable subscription = result.subscribe((item) -> {/*OnNext*/},
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/});
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR);
subscription.dispose();
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"Second\"}" + RECORD_SEPARATOR);
assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast());
}
@Test
public void checkStreamWithDisposeWithMultipleSubscriptions() {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
AtomicBoolean completed = new AtomicBoolean();
Observable<String> result = hubConnection.stream(String.class, "echo", "message");
Disposable subscription = result.subscribe((item) -> {/*OnNext*/},
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/});
Disposable subscription2 = result.subscribe((item) -> {/*OnNext*/},
(error) -> {/*OnError*/},
() -> {/*OnCompleted*/completed.set(true);});
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.getSentMessages()[1]);
assertFalse(completed.get());
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"First\"}" + RECORD_SEPARATOR);
subscription.dispose();
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"result\":\"Second\"}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\"}" + RECORD_SEPARATOR);
assertTrue(completed.get());
assertEquals("First", result.timeout(1000, TimeUnit.MILLISECONDS).blockingFirst());
subscription2.dispose();
assertEquals("Second", result.timeout(1000, TimeUnit.MILLISECONDS).blockingLast());
}
@Test
public void invokeWaitsForCompletionMessage() {
MockTransport mockTransport = new MockTransport();

View File

@ -108,15 +108,6 @@ class JsonHubProtocolTest {
assertEquals(42, messageResult);
}
@Test
public void parseSingleUnsupportedStreamItemMessage() {
String stringifiedMessage = "{\"type\":2,\"Id\":1,\"Item\":42}\u001E";
TestBinder binder = new TestBinder(null);
Throwable exception = assertThrows(UnsupportedOperationException.class, () -> jsonHubProtocol.parseMessages(stringifiedMessage, binder));
assertEquals("The message type STREAM_ITEM is not supported yet.", exception.getMessage());
}
@Test
public void parseSingleUnsupportedStreamInvocationMessage() {
String stringifiedMessage = "{\"type\":4,\"Id\":1,\"target\":\"test\",\"arguments\":[42]}\u001E";