[Java] Safely call onError on Subjects (#31779) (#32026)

This commit is contained in:
Brennan 2021-05-04 16:05:05 -07:00 committed by GitHub
parent 7e932906eb
commit 2d1e9e007f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 8 deletions

View File

@ -1410,14 +1410,14 @@ public class HubConnection implements AutoCloseable {
handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString); handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex); RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex);
handshakeResponseSubject.onError(exception); errorHandshake(exception);
throw exception; throw exception;
} }
if (handshakeResponse.getHandshakeError() != null) { if (handshakeResponse.getHandshakeError() != null) {
String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError(); String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError();
logger.error(errorMessage); logger.error(errorMessage);
RuntimeException exception = new RuntimeException(errorMessage); RuntimeException exception = new RuntimeException(errorMessage);
handshakeResponseSubject.onError(exception); errorHandshake(exception);
throw exception; throw exception;
} }
handshakeReceived = true; handshakeReceived = true;
@ -1428,12 +1428,7 @@ public class HubConnection implements AutoCloseable {
public void timeoutHandshakeResponse(long timeout, TimeUnit unit) { public void timeoutHandshakeResponse(long timeout, TimeUnit unit) {
handshakeTimeout = Executors.newSingleThreadScheduledExecutor(); handshakeTimeout = Executors.newSingleThreadScheduledExecutor();
handshakeTimeout.schedule(() -> { handshakeTimeout.schedule(() -> {
// If onError is called on a completed subject the global error handler is called errorHandshake(new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable()))
{
handshakeResponseSubject.onError(
new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
}
}, timeout, unit); }, timeout, unit);
} }
@ -1473,6 +1468,18 @@ public class HubConnection implements AutoCloseable {
return handlers.get(0).getTypes(); return handlers.get(0).getTypes();
} }
private void errorHandshake(Exception error) {
lock.lock();
try {
// If onError is called on a completed subject the global error handler is called
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) {
handshakeResponseSubject.onError(error);
}
} finally {
lock.unlock();
}
}
} }
// We don't have reconnect yet, but this helps align the Java client with the .NET client // We don't have reconnect yet, but this helps align the Java client with the .NET client

View File

@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
import io.reactivex.Completable; import io.reactivex.Completable;
@ -29,6 +30,7 @@ import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.ReplaySubject; import io.reactivex.subjects.ReplaySubject;
import io.reactivex.subjects.SingleSubject; import io.reactivex.subjects.SingleSubject;
@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})
class HubConnectionTest { class HubConnectionTest {
private static final String RECORD_SEPARATOR = "\u001e"; private static final String RECORD_SEPARATOR = "\u001e";
private static final Type booleanType = (new TypeReference<Boolean>() { }).getType(); private static final Type booleanType = (new TypeReference<Boolean>() { }).getType();

View File

@ -0,0 +1,41 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.signalr;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import io.reactivex.plugins.RxJavaPlugins;
// Use by adding "@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})" to a test class
class RxJavaUnhandledExceptionsExtensions implements BeforeAllCallback, AfterAllCallback {
private final BlockingQueue<Throwable> errors = new LinkedBlockingQueue<Throwable>();
@Override
public void beforeAll(final ExtensionContext context) {
RxJavaPlugins.setErrorHandler(error -> {
errors.put(error);
});
}
@Override
public void afterAll(final ExtensionContext context) {
if (errors.size() != 0) {
String RxErrors = "";
for (final Throwable throwable : errors) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
throwable.printStackTrace(printWriter);
RxErrors += String.format("%s\n", stringWriter.toString());
}
throw new RuntimeException(RxErrors);
}
}
}