Acquire HubConnectionStateLock before Send/Invoke/Stream (#12078)

This commit is contained in:
Mikael Mengistu 2019-07-15 11:31:37 -07:00 committed by GitHub
parent 4c07e1e6ad
commit 8b9503ee9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 75 deletions

View File

@ -537,11 +537,15 @@ public class HubConnection {
* @param args The arguments to be passed to the method.
*/
public void send(String method, Object... args) {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'send' method cannot be called if the connection is not active.");
hubConnectionStateLock.lock();
try {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'send' method cannot be called if the connection is not active.");
}
sendInvocationMessage(method, args);
} finally {
hubConnectionStateLock.unlock();
}
sendInvocationMessage(method, args);
}
private void sendInvocationMessage(String method, Object[] args) {
@ -605,26 +609,31 @@ public class HubConnection {
*/
@SuppressWarnings("unchecked")
public Completable invoke(String method, Object... args) {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active.");
hubConnectionStateLock.lock();
try {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active.");
}
String id = connectionState.getNextInvocationId();
CompletableSubject subject = CompletableSubject.create();
InvocationRequest irq = new InvocationRequest(null, id);
connectionState.addInvocation(irq);
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> subject.onComplete(),
error -> subject.onError(error),
() -> subject.onComplete());
// Make sure the actual send is after setting up the callbacks otherwise there is a race
// where the map doesn't have the callbacks yet when the response is returned
sendInvocationMessage(method, args, id, false);
return subject;
} finally {
hubConnectionStateLock.unlock();
}
String id = connectionState.getNextInvocationId();
CompletableSubject subject = CompletableSubject.create();
InvocationRequest irq = new InvocationRequest(null, id);
connectionState.addInvocation(irq);
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> subject.onComplete(),
error -> subject.onError(error),
() -> subject.onComplete());
// Make sure the actual send is after setting up the callbacks otherwise there is a race
// where the map doesn't have the callbacks yet when the response is returned
sendInvocationMessage(method, args, id, false);
return subject;
}
/**
@ -638,32 +647,37 @@ public class HubConnection {
*/
@SuppressWarnings("unchecked")
public <T> Single<T> invoke(Class<T> returnType, String method, Object... args) {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active.");
}
String id = connectionState.getNextInvocationId();
SingleSubject<T> subject = SingleSubject.create();
InvocationRequest irq = new InvocationRequest(returnType, id);
connectionState.addInvocation(irq);
// forward the invocation result or error to the user
// run continuations on a separate thread
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
subject.onSuccess((T)result);
} else {
subject.onSuccess(returnType.cast(result));
hubConnectionStateLock.lock();
try {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'invoke' method cannot be called if the connection is not active.");
}
}, error -> subject.onError(error));
// Make sure the actual send is after setting up the callbacks otherwise there is a race
// where the map doesn't have the callbacks yet when the response is returned
sendInvocationMessage(method, args, id, false);
return subject;
String id = connectionState.getNextInvocationId();
InvocationRequest irq = new InvocationRequest(returnType, id);
connectionState.addInvocation(irq);
SingleSubject<T> subject = SingleSubject.create();
// forward the invocation result or error to the user
// run continuations on a separate thread
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
subject.onSuccess((T)result);
} else {
subject.onSuccess(returnType.cast(result));
}
}, error -> subject.onError(error));
// Make sure the actual send is after setting up the callbacks otherwise there is a race
// where the map doesn't have the callbacks yet when the response is returned
sendInvocationMessage(method, args, id, false);
return subject;
} finally {
hubConnectionStateLock.unlock();
}
}
/**
@ -677,33 +691,46 @@ public class HubConnection {
*/
@SuppressWarnings("unchecked")
public <T> Observable<T> stream(Class<T> returnType, String method, Object ... args) {
String invocationId = connectionState.getNextInvocationId();
AtomicInteger subscriptionCount = new AtomicInteger();
InvocationRequest irq = new InvocationRequest(returnType, invocationId);
connectionState.addInvocation(irq);
ReplaySubject<T> subject = ReplaySubject.create();
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
subject.onNext((T)result);
} else {
subject.onNext(returnType.cast(result));
String invocationId;
InvocationRequest irq;
hubConnectionStateLock.lock();
try {
if (hubConnectionState != HubConnectionState.CONNECTED) {
throw new RuntimeException("The 'stream' method cannot be called if the connection is not active.");
}
}, error -> subject.onError(error),
() -> subject.onComplete());
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet());
sendInvocationMessage(method, args, invocationId, true);
return observable.doOnDispose(() -> {
if (subscriptionCount.decrementAndGet() == 0) {
CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId);
sendHubMessage(cancelInvocationMessage);
connectionState.tryRemoveInvocation(invocationId);
subject.onComplete();
}
});
invocationId = connectionState.getNextInvocationId();
irq = new InvocationRequest(returnType, invocationId);
connectionState.addInvocation(irq);
AtomicInteger subscriptionCount = new AtomicInteger();
ReplaySubject<T> subject = ReplaySubject.create();
Subject<Object> pendingCall = irq.getPendingCall();
pendingCall.subscribe(result -> {
// Primitive types can't be cast with the Class cast function
if (returnType.isPrimitive()) {
subject.onNext((T)result);
} else {
subject.onNext(returnType.cast(result));
}
}, error -> subject.onError(error),
() -> subject.onComplete());
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet());
sendInvocationMessage(method, args, invocationId, true);
return observable.doOnDispose(() -> {
if (subscriptionCount.decrementAndGet() == 0) {
CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId);
sendHubMessage(cancelInvocationMessage);
if (connectionState != null) {
connectionState.tryRemoveInvocation(invocationId);
}
subject.onComplete();
}
});
} finally {
hubConnectionStateLock.unlock();
}
}
private void sendHubMessage(HubMessage message) {

View File

@ -1608,6 +1608,15 @@ class HubConnectionTest {
assertEquals("The 'invoke' method cannot be called if the connection is not active.", exception.getMessage());
}
@Test
public void cannotStreamBeforeStart() {
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
Throwable exception = assertThrows(RuntimeException.class, () -> hubConnection.stream(String.class, "inc", "arg1"));
assertEquals("The 'stream' method cannot be called if the connection is not active.", exception.getMessage());
}
@Test
public void doesNotErrorWhenReceivingInvokeWithIncorrectArgumentLength() {
MockTransport mockTransport = new MockTransport();
@ -2036,7 +2045,7 @@ class HubConnectionTest {
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate", (req) -> {
if(redirectCount.get() == 0){
if (redirectCount.get() == 0) {
redirectCount.incrementAndGet();
redirectToken.set(req.getHeaders().get("Authorization"));
return Single.just(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\",\"accessToken\":\"firstRedirectToken\"}"));

View File

@ -8,7 +8,6 @@ import static org.junit.jupiter.api.Assertions.*;
import java.util.HashMap;
import java.util.stream.Stream;
import io.reactivex.Single;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

View File

@ -37,6 +37,6 @@ public class Chat {
hubConnection.send("Send", message);
}
hubConnection.stop();
hubConnection.stop().blockingAwait();
}
}