Merge branch 'release/2.2'

This commit is contained in:
MikaelMengistu 2018-10-11 16:09:48 -07:00
commit f146c0a369
14 changed files with 73 additions and 155 deletions

View File

@ -25,15 +25,23 @@ sourceCompatibility = 1.8
repositories {
mavenCentral()
// add sonatype repository (temporary, due to java-8-parent being a snapshot)
maven {
url 'https://oss.sonatype.org/content/repositories/snapshots/'
}
}
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.1'
testCompile 'org.junit.jupiter:junit-jupiter-params:5.3.1'
testRuntime 'org.junit.jupiter:junit-jupiter-engine:5.3.1'
implementation 'com.google.code.gson:gson:2.8.5'
implementation 'com.squareup.okhttp3:okhttp:3.11.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
implementation 'com.microsoft.maven:java-8-parent:8.0.0-SNAPSHOT'
// dependency versions imported from java-8-parent POM imported above
testImplementation 'org.junit.jupiter:junit-jupiter-api'
testCompile 'org.junit.jupiter:junit-jupiter-params'
testRuntime 'org.junit.jupiter:junit-jupiter-engine'
implementation 'com.google.code.gson:gson'
implementation 'com.squareup.okhttp3:okhttp'
implementation 'io.reactivex.rxjava2:rxjava'
implementation 'org.slf4j:slf4j-api'
}
spotless {

View File

@ -1,3 +1,6 @@
rootProject.name = 'signalr'
include 'main'
// This is required for Gradle 4.6+ to support importing BOMs, like we do for the Microsoft super pom.
// See here: https://docs.gradle.org/4.6/release-notes.html?_ga=2.220409368.162752831.1539212384-1601231980.1538950297#bom-import
enableFeaturePreview('IMPROVED_POM_SUPPORT')

View File

@ -1,55 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.signalr;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
public class ConsoleLogger implements Logger {
private LogLevel logLevel;
private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mmZ");
public ConsoleLogger(LogLevel logLevel) {
this.logLevel = logLevel;
}
@Override
public void log(LogLevel logLevel, String message) {
if (logLevel.value >= this.logLevel.value) {
String timeStamp = dateFormat.format(new Date());
message = String.format("[%s] [%s] %s", timeStamp, logLevel, message);
switch (logLevel) {
case Debug:
case Information:
System.out.println(message);
break;
case Warning:
case Error:
case Critical:
System.err.println(message);
break;
}
}
}
@Override
public void log(LogLevel logLevel, String formattedMessage, Object... args) {
if (logLevel.value >= this.logLevel.value) {
String timeStamp = dateFormat.format(new Date());
formattedMessage = String.format("[%s] [%s] %s%n", timeStamp, logLevel, formattedMessage);
switch (logLevel) {
case Debug:
case Information:
System.out.printf(formattedMessage, args);
break;
case Warning:
case Error:
case Critical:
System.err.printf(formattedMessage, args);
break;
}
}
}
}

View File

@ -24,10 +24,8 @@ import okhttp3.ResponseBody;
final class DefaultHttpClient extends HttpClient {
private final OkHttpClient client;
private Logger logger;
public DefaultHttpClient(Logger logger) {
this.logger = logger;
public DefaultHttpClient() {
this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() {
private List<Cookie> cookieList = new ArrayList<>();
private Lock cookieLock = new ReentrantLock();
@ -124,6 +122,6 @@ final class DefaultHttpClient extends HttpClient {
@Override
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
return new OkHttpWebSocketWrapper(url, headers, client, logger);
return new OkHttpWebSocketWrapper(url, headers, client);
}
}

View File

@ -12,7 +12,6 @@ import io.reactivex.Single;
public class HttpHubConnectionBuilder {
private final String url;
private Transport transport;
private Logger logger;
private HttpClient httpClient;
private boolean skipNegotiate;
private Single<String> accessTokenProvider;
@ -28,17 +27,11 @@ public class HttpHubConnectionBuilder {
return this;
}
public HttpHubConnectionBuilder withHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
return this;
}
public HttpHubConnectionBuilder configureLogging(LogLevel logLevel) {
this.logger = new ConsoleLogger(logLevel);
return this;
}
public HttpHubConnectionBuilder shouldSkipNegotiate(boolean skipNegotiate) {
this.skipNegotiate = skipNegotiate;
return this;
@ -49,16 +42,6 @@ public class HttpHubConnectionBuilder {
return this;
}
public HttpHubConnectionBuilder configureLogging(Logger logger) {
this.logger = logger;
return this;
}
public HttpHubConnectionBuilder withLogger(Logger logger) {
this.logger = logger;
return this;
}
public HttpHubConnectionBuilder withHandshakeResponseTimeout(Duration timeout) {
this.handshakeResponseTimeout = timeout;
return this;
@ -78,6 +61,6 @@ public class HttpHubConnectionBuilder {
}
public HubConnection build() {
return new HubConnection(url, transport, skipNegotiate, logger, httpClient, accessTokenProvider, handshakeResponseTimeout, headers);
return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers);
}
}

View File

@ -18,6 +18,9 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.reactivex.Completable;
import io.reactivex.Single;
@ -34,7 +37,6 @@ public class HubConnection {
private Boolean handshakeReceived = false;
private HubConnectionState hubConnectionState = HubConnectionState.DISCONNECTED;
private final Lock hubConnectionStateLock = new ReentrantLock();
private Logger logger;
private List<Consumer<Exception>> onClosedCallbackList;
private final boolean skipNegotiate;
private Single<String> accessTokenProvider;
@ -50,6 +52,8 @@ public class HubConnection {
private Duration tickRate = Duration.ofSeconds(1);
private CompletableFuture<Void> handshakeResponseFuture;
private Duration handshakeResponseTimeout = Duration.ofSeconds(15);
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
public void setServerTimeout(Duration serverTimeout) {
this.serverTimeout = serverTimeout;
@ -72,7 +76,7 @@ public class HubConnection {
this.tickRate = tickRate;
}
HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient,
HubConnection(String url, Transport transport, boolean skipNegotiate, HttpClient httpClient,
Single<String> accessTokenProvider, Duration handshakeResponseTimeout, Map<String, String> headers) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
@ -90,13 +94,7 @@ public class HubConnection {
if (httpClient != null) {
this.httpClient = httpClient;
} else {
this.httpClient = new DefaultHttpClient(this.logger);
}
if (logger != null) {
this.logger = logger;
} else {
this.logger = new NullLogger();
this.httpClient = new DefaultHttpClient();
}
if (transport != null) {
@ -128,7 +126,7 @@ public class HubConnection {
}
if (handshakeResponse.getHandshakeError() != null) {
String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError();
logger.log(LogLevel.Error, errorMessage);
logger.error(errorMessage);
RuntimeException exception = new RuntimeException(errorMessage);
handshakeResponseFuture.completeExceptionally(exception);
throw exception;
@ -146,7 +144,7 @@ public class HubConnection {
HubMessage[] messages = protocol.parseMessages(payload, connectionState);
for (HubMessage message : messages) {
logger.log(LogLevel.Debug, "Received message of type %s.", message.getMessageType());
logger.debug("Received message of type %s.", message.getMessageType());
switch (message.getMessageType()) {
case INVOCATION:
InvocationMessage invocationMessage = (InvocationMessage) message;
@ -156,11 +154,11 @@ public class HubConnection {
handler.getAction().invoke(invocationMessage.getArguments());
}
} else {
logger.log(LogLevel.Warning, "Failed to find handler for '%s' method.", invocationMessage.getTarget());
logger.warn("Failed to find handler for '%s' method.", invocationMessage.getTarget());
}
break;
case CLOSE:
logger.log(LogLevel.Information, "Close message received from server.");
logger.info("Close message received from server.");
CloseMessage closeMessage = (CloseMessage) message;
stop(closeMessage.getError());
break;
@ -171,7 +169,7 @@ public class HubConnection {
CompletionMessage completionMessage = (CompletionMessage)message;
InvocationRequest irq = connectionState.tryRemoveInvocation(completionMessage.getInvocationId());
if (irq == null) {
logger.log(LogLevel.Warning, "Dropped unsolicited Completion message for invocation '%s'.", completionMessage.getInvocationId());
logger.warn("Dropped unsolicited Completion message for invocation '%s'.", completionMessage.getInvocationId());
continue;
}
irq.complete(completionMessage);
@ -179,7 +177,7 @@ public class HubConnection {
case STREAM_INVOCATION:
case STREAM_ITEM:
case CANCEL_INVOCATION:
logger.log(LogLevel.Error, "This client does not support %s messages.", message.getMessageType());
logger.error("This client does not support %s messages.", message.getMessageType());
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", message.getMessageType()));
}
@ -257,9 +255,9 @@ public class HubConnection {
}
return Completable.fromFuture(negotiate.thenCompose(url -> {
logger.log(LogLevel.Debug, "Starting HubConnection.");
logger.debug("Starting HubConnection.");
if (transport == null) {
transport = new WebSocketTransport(headers, httpClient, logger);
transport = new WebSocketTransport(headers, httpClient);
}
transport.setOnReceive(this.callback);
@ -275,7 +273,7 @@ public class HubConnection {
try {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");
logger.info("HubConnection started.");
resetServerTimeout();
this.pingTimer = new Timer();
@ -292,7 +290,7 @@ public class HubConnection {
sendHubMessage(PingMessage.getInstance());
}
} catch (Exception e) {
logger.log(LogLevel.Warning, String.format("Error sending ping: %s", e.getMessage()));
logger.warn(String.format("Error sending ping: %s", e.getMessage()));
// The connection is probably in a bad or closed state now, cleanup the timer so
// it stops triggering
pingTimer.cancel();
@ -353,9 +351,9 @@ public class HubConnection {
if (errorMessage != null) {
stopError = errorMessage;
logger.log(LogLevel.Error, "HubConnection disconnected with an error: %s.", errorMessage);
logger.error("HubConnection disconnected with an error: %s.", errorMessage);
} else {
logger.log(LogLevel.Debug, "Stopping HubConnection.");
logger.debug("Stopping HubConnection.");
}
} finally {
hubConnectionStateLock.unlock();
@ -383,11 +381,11 @@ public class HubConnection {
}
if (errorMessage != null) {
exception = new RuntimeException(errorMessage);
logger.log(LogLevel.Error, "HubConnection disconnected with an error %s.", errorMessage);
logger.error("HubConnection disconnected with an error %s.", errorMessage);
}
connectionState.cancelOutstandingInvocations(exception);
connectionState = null;
logger.log(LogLevel.Information, "HubConnection stopped.");
logger.info("HubConnection stopped.");
hubConnectionState = HubConnectionState.DISCONNECTED;
handshakeResponseFuture.complete(null);
} finally {
@ -453,9 +451,9 @@ public class HubConnection {
private void sendHubMessage(HubMessage message) {
String serializedMessage = protocol.writeMessage(message);
if (message.getMessageType() == HubMessageType.INVOCATION) {
logger.log(LogLevel.Debug, "Sending %s message '%s'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId());
logger.debug("Sending %s message '%s'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId());
} else {
logger.log(LogLevel.Debug, "Sending %s message.", message.getMessageType().name());
logger.debug("Sending %s message.", message.getMessageType().name());
}
transport.send(serializedMessage);
@ -477,7 +475,7 @@ public class HubConnection {
*/
public void remove(String name) {
handlers.remove(name);
logger.log(LogLevel.Trace, "Removing handlers for client method: %s.", name);
logger.trace("Removing handlers for client method: %s.", name);
}
public void onClosed(Consumer<Exception> callback) {
@ -695,7 +693,7 @@ public class HubConnection {
private Subscription registerHandler(String target, ActionBase action, Class<?>... types) {
InvocationHandler handler = handlers.put(target, action, types);
logger.log(LogLevel.Debug, "Registering handler for client method: '%s'.", target);
logger.debug("Registering handler for client method: '%s'.", target);
return new Subscription(handlers, handler, target);
}
@ -779,7 +777,7 @@ public class HubConnection {
public List<Class<?>> getParameterTypes(String methodName) {
List<InvocationHandler> handlers = connection.handlers.get(methodName);
if (handlers == null) {
logger.log(LogLevel.Warning, "Failed to find handler for '%s' method.", methodName);
logger.warn("Failed to find handler for '%s' method.", methodName);
return emptyArray;
}

View File

@ -1,9 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.signalr;
public interface Logger {
void log(LogLevel logLevel, String message);
void log(LogLevel logLevel, String formattedMessage, Object ... args);
}

View File

@ -1,12 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
package com.microsoft.signalr;
class NullLogger implements Logger {
@Override
public void log(LogLevel logLevel, String message) { }
@Override
public void log(LogLevel logLevel, String formattedMessage, Object... args) { }
}

View File

@ -7,6 +7,9 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@ -19,17 +22,17 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
private String url;
private Map<String, String> headers;
private OkHttpClient client;
private Logger logger;
private OnReceiveCallBack onReceive;
private BiConsumer<Integer, String> onClose;
private CompletableFuture<Void> startFuture = new CompletableFuture<>();
private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
public OkHttpWebSocketWrapper(String url, Map<String, String> headers, OkHttpClient client, Logger logger) {
private final Logger logger = LoggerFactory.getLogger(OkHttpWebSocketWrapper.class);
public OkHttpWebSocketWrapper(String url, Map<String, String> headers, OkHttpClient client) {
this.url = url;
this.headers = headers;
this.client = client;
this.logger = logger;
}
@Override
@ -95,7 +98,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.log(LogLevel.Error, "Websocket closed from an error: %s.", t.getMessage());
logger.error("Websocket closed from an error: %s.", t.getMessage());
closeFuture.completeExceptionally(new RuntimeException(t));
onClose.accept(null, t.getMessage());
checkStartFailure();
@ -106,7 +109,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
// exceptionally.
if (!startFuture.isDone()) {
String errorMessage = "There was an error starting the Websockets transport.";
logger.log(LogLevel.Debug, errorMessage);
logger.error("Websocket closed from an error: %s.", errorMessage);
startFuture.completeExceptionally(new RuntimeException(errorMessage));
}
}

View File

@ -7,22 +7,26 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class WebSocketTransport implements Transport {
private WebSocketWrapper webSocketClient;
private OnReceiveCallBack onReceiveCallBack;
private Consumer<String> onClose;
private String url;
private Logger logger;
private final HttpClient client;
private final Map<String, String> headers;
private final Logger logger = LoggerFactory.getLogger(WebSocketTransport.class);
private static final String HTTP = "http";
private static final String HTTPS = "https";
private static final String WS = "ws";
private static final String WSS = "wss";
public WebSocketTransport(Map<String, String> headers, HttpClient client, Logger logger) {
this.logger = logger;
public WebSocketTransport(Map<String, String> headers, HttpClient client) {
this.client = client;
this.headers = headers;
}
@ -44,7 +48,7 @@ class WebSocketTransport implements Transport {
@Override
public CompletableFuture<Void> start(String url) {
this.url = formatUrl(url);
logger.log(LogLevel.Debug, "Starting Websocket connection.");
logger.debug("Starting Websocket connection.");
this.webSocketClient = client.createWebSocket(this.url, this.headers);
this.webSocketClient.setOnReceive((message) -> onReceive(message));
this.webSocketClient.setOnClose((code, reason) -> {
@ -53,7 +57,7 @@ class WebSocketTransport implements Transport {
}
});
return webSocketClient.start().thenRun(() -> logger.log(LogLevel.Information, "WebSocket transport connected to: %s.", this.url));
return webSocketClient.start().thenRun(() -> logger.info("WebSocket transport connected to: %s.", this.url));
}
@Override
@ -64,7 +68,7 @@ class WebSocketTransport implements Transport {
@Override
public void setOnReceive(OnReceiveCallBack callback) {
this.onReceiveCallBack = callback;
logger.log(LogLevel.Debug, "OnReceived callback has been set.");
logger.debug("OnReceived callback has been set.");
}
@Override
@ -79,11 +83,11 @@ class WebSocketTransport implements Transport {
@Override
public CompletableFuture<Void> stop() {
return webSocketClient.stop().whenComplete((i, j) -> logger.log(LogLevel.Information, "WebSocket connection stopped."));
return webSocketClient.stop().whenComplete((i, j) -> logger.info("WebSocket connection stopped."));
}
void onClose(int code, String reason) {
logger.log(LogLevel.Information, "WebSocket connection stopping with " +
logger.info("WebSocket connection stopping with " +
"code %d and reason '%s'.", code, reason);
if (code != 1000) {
onClose.accept(reason);

View File

@ -5,19 +5,18 @@ package com.microsoft.signalr;
class TestUtils {
static HubConnection createHubConnection(String url) {
return createHubConnection(url, new MockTransport(true), new NullLogger(), true, new TestHttpClient());
return createHubConnection(url, new MockTransport(true), true, new TestHttpClient());
}
static HubConnection createHubConnection(String url, Transport transport) {
return createHubConnection(url, transport, new NullLogger(), true, new TestHttpClient());
return createHubConnection(url, transport, true, new TestHttpClient());
}
static HubConnection createHubConnection(String url, Transport transport, Logger logger, boolean skipNegotiate, HttpClient client) {
static HubConnection createHubConnection(String url, Transport transport, boolean skipNegotiate, HttpClient client) {
HttpHubConnectionBuilder builder = HubConnectionBuilder.create(url)
.withTransport(transport)
.withHttpClient(client)
.shouldSkipNegotiate(skipNegotiate)
.withLogger(logger);
.shouldSkipNegotiate(skipNegotiate);
return builder.build();
}

View File

@ -14,7 +14,7 @@ import org.junit.jupiter.api.Test;
class WebSocketTransportTest {
@Test
public void WebsocketThrowsIfItCantConnect() {
Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient());
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start("http://www.example.com").get(1, TimeUnit.SECONDS));
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());
}

View File

@ -24,7 +24,7 @@ class WebSocketTransportUrlFormatTest {
@ParameterizedTest
@MethodSource("protocols")
public void checkWebsocketUrlProtocol(String url, String expectedUrl) {
WebSocketTransport webSocketTransport = new WebSocketTransport(new HashMap<>(), new TestHttpClient(), new NullLogger());
WebSocketTransport webSocketTransport = new WebSocketTransport(new HashMap<>(), new TestHttpClient());
try {
webSocketTransport.start(url);
} catch (Exception e) {}

View File

@ -7,7 +7,6 @@ import java.util.Scanner;
import com.microsoft.signalr.HubConnection;
import com.microsoft.signalr.HubConnectionBuilder;
import com.microsoft.signalr.LogLevel;
public class Chat {
public static void main(String[] args) {
@ -18,8 +17,7 @@ public class Chat {
System.out.print("Enter your name:");
String enteredName = reader.nextLine();
HubConnection hubConnection = HubConnectionBuilder.create(input)
.configureLogging(LogLevel.Information).build();
HubConnection hubConnection = HubConnectionBuilder.create(input).build();
hubConnection.on("Send", (name, message) -> {
System.out.println(name + ": " + message);