[Java] Add Invoke support (#2961)

This commit is contained in:
BrennanConroy 2018-09-19 13:58:15 -07:00 committed by GitHub
parent 8be051ce34
commit 4b378692a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 367 additions and 43 deletions

View File

@ -4,7 +4,7 @@
package com.microsoft.aspnet.signalr;
class CloseMessage extends HubMessage {
String error;
private String error;
@Override
public HubMessageType getMessageType() {

View File

@ -0,0 +1,38 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
class CompletionMessage extends HubMessage {
private int type = HubMessageType.COMPLETION.value;
private String invocationId;
private Object result;
private String error;
public CompletionMessage(String invocationId, Object result, String error) {
if (error != null && result != null)
{
throw new IllegalArgumentException("Expected either 'error' or 'result' to be provided, but not both");
}
this.invocationId = invocationId;
this.result = result;
this.error = error;
}
public Object getResult() {
return result;
}
public String getError() {
return error;
}
public String getInvocationId() {
return invocationId;
}
@Override
public HubMessageType getMessageType() {
return HubMessageType.values()[type - 1];
}
}

View File

@ -9,6 +9,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@ -71,13 +72,13 @@ public class HubConnection {
switch (message.getMessageType()) {
case INVOCATION:
InvocationMessage invocationMessage = (InvocationMessage) message;
List<InvocationHandler> handlers = this.handlers.get(invocationMessage.target);
List<InvocationHandler> handlers = this.handlers.get(invocationMessage.getTarget());
if (handlers != null) {
for (InvocationHandler handler : handlers) {
handler.getAction().invoke(invocationMessage.arguments);
handler.getAction().invoke(invocationMessage.getArguments());
}
} else {
logger.log(LogLevel.Warning, "Failed to find handler for %s method.", invocationMessage.target);
logger.log(LogLevel.Warning, "Failed to find handler for %s method.", invocationMessage.getMessageType());
}
break;
case CLOSE:
@ -88,10 +89,18 @@ public class HubConnection {
case PING:
// We don't need to do anything in the case of a ping message.
break;
case COMPLETION:
CompletionMessage completionMessage = (CompletionMessage)message;
InvocationRequest irq = connectionState.tryRemoveInvocation(completionMessage.getInvocationId());
if (irq == null) {
logger.log(LogLevel.Warning, "Dropped unsolicited Completion message for invocation '%s'.", completionMessage.getInvocationId());
continue;
}
irq.complete(completionMessage);
break;
case STREAM_INVOCATION:
case STREAM_ITEM:
case CANCEL_INVOCATION:
case COMPLETION:
logger.log(LogLevel.Error, "This client does not support %s messages.", message.getMessageType());
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", message.getMessageType()));
@ -220,6 +229,7 @@ public class HubConnection {
* Stops a connection to the server.
*/
private void stop(String errorMessage) {
HubException hubException = null;
hubConnectionStateLock.lock();
try {
if (hubConnectionState == HubConnectionState.DISCONNECTED) {
@ -234,6 +244,11 @@ public class HubConnection {
transport.stop();
hubConnectionState = HubConnectionState.DISCONNECTED;
if (errorMessage != null) {
hubException = new HubException(errorMessage);
}
connectionState.cancelOutstandingInvocations(hubException);
connectionState = null;
logger.log(LogLevel.Information, "HubConnection stopped.");
} finally {
@ -241,7 +256,6 @@ public class HubConnection {
}
if (onClosedCallbackList != null) {
HubException hubException = new HubException(errorMessage);
for (Consumer<Exception> callback : onClosedCallbackList) {
callback.accept(hubException);
}
@ -268,10 +282,49 @@ public class HubConnection {
throw new HubException("The 'send' method cannot be called if the connection is not active");
}
InvocationMessage invocationMessage = new InvocationMessage(method, args);
String message = protocol.writeMessage(invocationMessage);
logger.log(LogLevel.Debug, "Sending message");
transport.send(message);
InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
sendHubMessage(invocationMessage);
}
public <T> CompletableFuture<T> invoke(Class<T> returnType, String method, Object... args) throws Exception {
String id = connectionState.getNextInvocationId();
InvocationMessage invocationMessage = new InvocationMessage(id, method, args);
CompletableFuture<T> future = new CompletableFuture<>();
InvocationRequest irq = new InvocationRequest(returnType, id);
connectionState.addInvocation(irq);
// forward the invocation result or error to the user
// run continuations on a separate thread
CompletableFuture<Object> pendingCall = irq.getPendingCall();
pendingCall.whenCompleteAsync((result, error) -> {
if (error == null) {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
future.complete((T)result);
} else {
future.complete(returnType.cast(result));
}
} else {
future.completeExceptionally(error);
}
});
// Make sure the actual send is after setting up the future otherwise there is a race
// where the map doesn't have the future yet when the response is returned
sendHubMessage(invocationMessage);
return future;
}
private void sendHubMessage(HubMessage message) throws Exception {
String serializedMessage = protocol.writeMessage(message);
if (message.getMessageType() == HubMessageType.INVOCATION) {
logger.log(LogLevel.Debug, "Sending %d message '%s'.", message.getMessageType().value, ((InvocationMessage)message).getInvocationId());
} else {
logger.log(LogLevel.Debug, "Sending %d message.", message.getMessageType().value);
}
transport.send(serializedMessage);
}
/**
@ -559,15 +612,79 @@ public class HubConnection {
}
private class ConnectionState implements InvocationBinder {
HubConnection connection;
private HubConnection connection;
private AtomicInteger nextId = new AtomicInteger(0);
private HashMap<String, InvocationRequest> pendingInvocations = new HashMap<>();
private Lock lock = new ReentrantLock();
public ConnectionState(HubConnection connection) {
this.connection = connection;
}
public String getNextInvocationId() {
int i = nextId.incrementAndGet();
return Integer.toString(i);
}
public void cancelOutstandingInvocations(Exception ex) {
lock.lock();
try {
pendingInvocations.forEach((key, irq) -> {
if (ex == null) {
irq.cancel();
} else {
irq.fail(ex);
}
});
pendingInvocations.clear();
} finally {
lock.unlock();
}
}
public void addInvocation(InvocationRequest irq) {
lock.lock();
try {
pendingInvocations.compute(irq.getInvocationId(), (key, value) -> {
if (value != null) {
// This should never happen
throw new IllegalStateException("Invocation Id is already used");
}
return irq;
});
} finally {
lock.unlock();
}
}
public InvocationRequest getInvocation(String id) {
lock.lock();
try {
return pendingInvocations.get(id);
} finally {
lock.unlock();
}
}
public InvocationRequest tryRemoveInvocation(String id) {
lock.lock();
try {
return pendingInvocations.remove(id);
} finally {
lock.unlock();
}
}
@Override
public Class<?> getReturnType(String invocationId) {
return null;
InvocationRequest irq = getInvocation(invocationId);
if (irq == null) {
return null;
}
return irq.getReturnType();
}
@Override

View File

@ -3,7 +3,6 @@
package com.microsoft.aspnet.signalr;
/**
* A protocol abstraction for communicating with SignalR hubs.
*/

View File

@ -5,11 +5,12 @@ package com.microsoft.aspnet.signalr;
class InvocationMessage extends HubMessage {
int type = HubMessageType.INVOCATION.value;
String invocationId;
String target;
Object[] arguments;
protected String invocationId;
private String target;
private Object[] arguments;
public InvocationMessage(String target, Object[] args) {
public InvocationMessage(String invocationId, String target, Object[] args) {
this.invocationId = invocationId;
this.target = target;
this.arguments = args;
}
@ -18,10 +19,6 @@ class InvocationMessage extends HubMessage {
return invocationId;
}
public void setInvocationId(String invocationId) {
this.invocationId = invocationId;
}
public String getTarget() {
return target;
}

View File

@ -0,0 +1,45 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.aspnet.signalr;
import java.util.concurrent.CompletableFuture;
class InvocationRequest {
private Class<?> returnType;
private CompletableFuture<Object> pendingCall = new CompletableFuture<>();
private String invocationId;
InvocationRequest(Class<?> returnType, String invocationId) {
this.returnType = returnType;
this.invocationId = invocationId;
}
public void complete(CompletionMessage completion) {
if (completion.getResult() != null) {
pendingCall.complete(completion.getResult());
} else {
pendingCall.completeExceptionally(new HubException(completion.getError()));
}
}
public void fail(Exception ex) {
pendingCall.completeExceptionally(ex);
}
public void cancel() {
pendingCall.cancel(false);
}
public CompletableFuture<Object> getPendingCall() {
return pendingCall;
}
public Class<?> getReturnType() {
return returnType;
}
public String getInvocationId() {
return invocationId;
}
}

View File

@ -9,6 +9,7 @@ import java.util.List;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
@ -43,6 +44,8 @@ class JsonHubProtocol implements HubProtocol {
String error = null;
ArrayList<Object> arguments = null;
JsonArray argumentsToken = null;
Object result = null;
JsonElement resultToken = null;
JsonReader reader = new JsonReader(new StringReader(str));
reader.beginObject();
@ -63,7 +66,11 @@ class JsonHubProtocol implements HubProtocol {
error = reader.nextString();
break;
case "result":
reader.skipValue();
if (invocationId == null) {
resultToken = jsonParser.parse(reader);
} else {
result = gson.fromJson(reader, binder.getReturnType(invocationId));
}
break;
case "item":
reader.skipValue();
@ -107,14 +114,19 @@ class JsonHubProtocol implements HubProtocol {
}
}
if (arguments == null) {
hubMessages.add(new InvocationMessage(target, new Object[0]));
hubMessages.add(new InvocationMessage(invocationId, target, new Object[0]));
} else {
hubMessages.add(new InvocationMessage(target, arguments.toArray()));
hubMessages.add(new InvocationMessage(invocationId, target, arguments.toArray()));
}
break;
case COMPLETION:
if (resultToken != null) {
result = gson.fromJson(resultToken, binder.getReturnType(invocationId));
}
hubMessages.add(new CompletionMessage(invocationId, result, error));
break;
case STREAM_INVOCATION:
case STREAM_ITEM:
case COMPLETION:
case CANCEL_INVOCATION:
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", messageType));
case PING:

View File

@ -8,8 +8,7 @@ class StreamInvocationMessage extends InvocationMessage {
int type = HubMessageType.STREAM_INVOCATION.value;
public StreamInvocationMessage(String invocationId, String target, Object[] arguments) {
super(target, arguments);
this.invocationId = invocationId;
super(invocationId, target, arguments);
}
@Override

View File

@ -6,7 +6,9 @@ package com.microsoft.aspnet.signalr;
import static org.junit.jupiter.api.Assertions.*;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
@ -298,12 +300,117 @@ public class HubConnectionTest {
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}" + RECORD_SEPARATOR);
hubConnection.send("add", 12);
// Confirming that our handler was called and the correct message was passed in.
assertEquals(Double.valueOf(24), value.get());
}
@Test
public void invokeWaitsForCompletionMessage() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, true);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.sentMessages.get(1));
assertFalse(result.isDone());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR);
assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS));
}
@Test
public void multipleInvokesWaitForOwnCompletionMessage() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, true);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(Integer.class, "echo", "message");
CompletableFuture<String> result2 = hubConnection.invoke(String.class, "echo", "message");
assertEquals("{\"type\":1,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.sentMessages.get(1));
assertEquals("{\"type\":1,\"invocationId\":\"2\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, mockTransport.sentMessages.get(2));
assertFalse(result.isDone());
assertFalse(result2.isDone());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"2\",\"result\":\"message\"}" + RECORD_SEPARATOR);
assertEquals("message", result2.get(1000L, TimeUnit.MILLISECONDS));
assertFalse(result.isDone());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR);
assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS));
}
@Test
public void invokeWorksForPrimitiveTypes() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, true);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
// int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type
// which does not work for primitives we have to write special logic for that case.
CompletableFuture<Integer> result = hubConnection.invoke(int.class, "echo", "message");
assertFalse(result.isDone());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":42}" + RECORD_SEPARATOR);
assertEquals(Integer.valueOf(42), result.get(1000L, TimeUnit.MILLISECONDS));
}
@Test
public void completionMessageCanHaveError() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, true);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(int.class, "echo", "message");
assertFalse(result.isDone());
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"error\":\"There was an error\"}" + RECORD_SEPARATOR);
String exceptionMessage = null;
try {
result.get(1000L, TimeUnit.MILLISECONDS);
assertFalse(true);
} catch (Exception ex) {
exceptionMessage = ex.getMessage();
}
assertEquals("com.microsoft.aspnet.signalr.HubException: There was an error", exceptionMessage);
}
@Test
public void stopCancelsActiveInvokes() throws Exception {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = new HubConnection("http://example.com", mockTransport, true);
hubConnection.start();
mockTransport.receiveMessage("{}" + RECORD_SEPARATOR);
CompletableFuture<Integer> result = hubConnection.invoke(int.class, "echo", "message");
assertFalse(result.isDone());
hubConnection.stop();
boolean hasException = false;
try {
result.get(1000L, TimeUnit.MILLISECONDS);
assertFalse(true);
} catch (CancellationException ex) {
hasException = true;
}
assertTrue(hasException);
}
@Test
public void sendWithNoParamsTriggersOnHandler() throws Exception {
AtomicReference<Integer> value = new AtomicReference<>(0);

View File

@ -32,7 +32,7 @@ public class JsonHubProtocolTest {
@Test
public void verifyWriteMessage() {
InvocationMessage invocationMessage = new InvocationMessage("test", new Object[] {"42"});
InvocationMessage invocationMessage = new InvocationMessage(null, "test", new Object[] {"42"});
String result = jsonHubProtocol.writeMessage(invocationMessage);
String expectedResult = "{\"type\":1,\"target\":\"test\",\"arguments\":[\"42\"]}\u001E";
assertEquals(expectedResult, result);
@ -89,7 +89,7 @@ public class JsonHubProtocolTest {
@Test
public void parseSingleMessage() throws Exception {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":[42]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("test", new Object[] { 42 }));
TestBinder binder = new TestBinder(new InvocationMessage("1", "test", new Object[] { 42 }));
HubMessage[] messages = jsonHubProtocol.parseMessages(stringifiedMessage, binder);
@ -135,19 +135,10 @@ public class JsonHubProtocolTest {
assertEquals("The message type CANCEL_INVOCATION is not supported yet.", exception.getMessage());
}
@Test
public void parseSingleUnsupportedCompletionMessage() throws Exception {
String stringifiedMessage = "{\"type\":3,\"invocationId\":123}\u001E";
TestBinder binder = new TestBinder(null);
Throwable exception = assertThrows(UnsupportedOperationException.class, () -> jsonHubProtocol.parseMessages(stringifiedMessage, binder));
assertEquals("The message type COMPLETION is not supported yet.", exception.getMessage());
}
@Test
public void parseTwoMessages() throws Exception {
String twoMessages = "{\"type\":1,\"target\":\"one\",\"arguments\":[42]}\u001E{\"type\":1,\"target\":\"two\",\"arguments\":[43]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("one", new Object[] { 42 }));
TestBinder binder = new TestBinder(new InvocationMessage("1", "one", new Object[] { 42 }));
HubMessage[] messages = jsonHubProtocol.parseMessages(twoMessages, binder);
assertEquals(2, messages.length);
@ -178,7 +169,7 @@ public class JsonHubProtocolTest {
@Test
public void parseSingleMessageMutipleArgs() throws Exception {
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":[42, 24]}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("test", new Object[] { 42, 24 }));
TestBinder binder = new TestBinder(new InvocationMessage("1", "test", new Object[] { 42, 24 }));
HubMessage[] messages = jsonHubProtocol.parseMessages(stringifiedMessage, binder);
@ -197,7 +188,7 @@ public class JsonHubProtocolTest {
@Test
public void parseMessageWithOutOfOrderProperties() throws Exception {
String stringifiedMessage = "{\"arguments\":[42, 24],\"type\":1,\"target\":\"test\"}\u001E";
TestBinder binder = new TestBinder(new InvocationMessage("test", new Object[] { 42, 24 }));
TestBinder binder = new TestBinder(new InvocationMessage("1", "test", new Object[] { 42, 24 }));
HubMessage[] messages = jsonHubProtocol.parseMessages(stringifiedMessage, binder);
@ -213,8 +204,24 @@ public class JsonHubProtocolTest {
assertEquals(24, messageResult2);
}
@Test
public void parseCompletionMessageWithOutOfOrderProperties() throws Exception {
String stringifiedMessage = "{\"type\":3,\"result\":42,\"invocationId\":\"1\"}\u001E";
TestBinder binder = new TestBinder(new CompletionMessage("1", 42, null));
HubMessage[] messages = jsonHubProtocol.parseMessages(stringifiedMessage, binder);
// We know it's only one message
assertEquals(HubMessageType.COMPLETION, messages[0].getMessageType());
CompletionMessage message = (CompletionMessage) messages[0];
assertEquals(null, message.getError());
assertEquals(42 , message.getResult());
}
private class TestBinder implements InvocationBinder {
private Class<?>[] paramTypes = null;
private Class<?> returnType = null;
public TestBinder(HubMessage expectedMessage) {
if (expectedMessage == null) {
@ -238,6 +245,9 @@ public class JsonHubProtocolTest {
break;
case STREAM_ITEM:
break;
case COMPLETION:
returnType = ((CompletionMessage)expectedMessage).getResult().getClass();
break;
default:
break;
}
@ -245,7 +255,7 @@ public class JsonHubProtocolTest {
@Override
public Class<?> getReturnType(String invocationId) {
return null;
return returnType;
}
@Override