Merge branch 'release/2.2'

This commit is contained in:
unknown 2018-08-06 11:07:16 -07:00
commit beb546e8df
10 changed files with 174 additions and 19 deletions

View File

@ -9,7 +9,7 @@ public class Chat {
Scanner reader = new Scanner(System.in); // Reading from System.in
String input;
input = reader.nextLine();
HubConnection hubConnection = new HubConnection(input);
HubConnection hubConnection = new HubConnection(input, LogLevel.Information);
hubConnection.on("Send", (message) -> {
System.out.println("REGISTERED HANDLER: " + message);

View File

@ -0,0 +1,44 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
public class ConsoleLogger implements Logger {
private LogLevel logLevel;
public ConsoleLogger(LogLevel logLevel) {
this.logLevel = logLevel;
}
@Override
public void log(LogLevel logLevel, String message) {
if(logLevel.value >= this.logLevel.value){
switch (logLevel) {
case Debug:
case Information:
System.out.println(message);
break;
case Warning:
case Error:
case Critical:
System.err.println(message);
break;
}
}
}
@Override
public void log(LogLevel logLevel, String formattedMessage, Object... args) {
if (logLevel.value >= this.logLevel.value) {
formattedMessage = formattedMessage + "%n";
switch (logLevel) {
case Debug:
case Information:
System.out.printf(formattedMessage, args);
break;
case Warning:
case Error:
case Critical:
System.err.printf(formattedMessage, args);
break;
}
}
}
}

View File

@ -18,10 +18,12 @@ public class HubConnection {
private Boolean handshakeReceived = false;
private static final String RECORD_SEPARATOR = "\u001e";
private HubConnectionState connectionState = HubConnectionState.DISCONNECTED;
private Logger logger;
public HubConnection(String url, Transport transport) {
public HubConnection(String url, Transport transport, Logger logger){
this.url = url;
this.protocol = new JsonHubProtocol();
this.logger = logger;
this.callback = (payload) -> {
if (!handshakeReceived) {
@ -29,7 +31,9 @@ public class HubConnection {
String handshakeResponseString = payload.substring(0, handshakeLength - 1);
HandshakeResponseMessage handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
if (handshakeResponse.error != null) {
throw new Exception("Error in handshake " + handshakeResponse.error);
String errorMessage = "Error in handshake " + handshakeResponse.error;
logger.log(LogLevel.Error, errorMessage);
throw new Exception(errorMessage);
}
handshakeReceived = true;
@ -43,17 +47,21 @@ public class HubConnection {
HubMessage[] messages = protocol.parseMessages(payload);
for (HubMessage message : messages) {
logger.log(LogLevel.Debug,"Received message of type %s", message.getMessageType());
switch (message.getMessageType()) {
case INVOCATION:
InvocationMessage invocationMessage = (InvocationMessage)message;
if (message != null && handlers.containsKey(invocationMessage.target)) {
if (handlers.containsKey(invocationMessage.target)) {
ArrayList<Object> args = gson.fromJson((JsonArray)invocationMessage.arguments[0], (new ArrayList<>()).getClass());
List<ActionBase> actions = handlers.get(invocationMessage.target);
if (actions != null) {
logger.log(LogLevel.Debug, "Invoking handlers for target %s", invocationMessage.target);
for (ActionBase action: actions) {
action.invoke(args.toArray());
}
}
} else {
logger.log(LogLevel.Warning, "Failed to find handler for %s method", invocationMessage.target);
}
break;
case STREAM_INVOCATION:
@ -61,10 +69,11 @@ public class HubConnection {
case CLOSE:
case CANCEL_INVOCATION:
case COMPLETION:
throw new UnsupportedOperationException("The message type " + message.getMessageType() + " is not supported yet.");
logger.log(LogLevel.Error, "This client does not support %s messages", message.getMessageType());
throw new UnsupportedOperationException(String.format("The message type %s is not supported yet.", message.getMessageType()));
case PING:
// We don't need to do anything in the case of a ping message.
// The other message types aren't supported
break;
}
}
@ -72,7 +81,7 @@ public class HubConnection {
if (transport == null){
try {
this.transport = new WebSocketTransport(this.url);
this.transport = new WebSocketTransport(this.url, this.logger);
} catch (URISyntaxException e) {
e.printStackTrace();
}
@ -81,8 +90,16 @@ public class HubConnection {
}
}
public HubConnection(String url, Transport transport) {
this(url, transport, new NullLogger());
}
public HubConnection(String url) {
this(url, null);
this(url, null, new NullLogger());
}
public HubConnection(String url, LogLevel logLevel){
this(url, null, new ConsoleLogger(logLevel));
}
public HubConnectionState getConnectionState() {
@ -90,33 +107,40 @@ public class HubConnection {
}
public void start() throws Exception {
logger.log(LogLevel.Debug, "Starting HubConnection");
transport.setOnReceive(this.callback);
transport.start();
String handshake = HandshakeProtocol.createHandshakeRequestMessage(new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
transport.send(handshake);
connectionState = HubConnectionState.CONNECTED;
logger.log(LogLevel.Information, "HubConnected started");
}
public void stop(){
logger.log(LogLevel.Debug, "Stopping HubConnection");
transport.stop();
connectionState = HubConnectionState.DISCONNECTED;
logger.log(LogLevel.Information, "HubConnection stopped");
}
public void send(String method, Object... args) throws Exception {
InvocationMessage invocationMessage = new InvocationMessage(method, args);
String message = protocol.writeMessage(invocationMessage);
logger.log(LogLevel.Debug, "Sending message");
transport.send(message);
}
public Subscription on(String target, Action callback) {
ActionBase action = args -> callback.invoke();
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
public <T1> Subscription on(String target, Action1<T1> callback, Class<T1> param1) {
ActionBase action = params -> callback.invoke(param1.cast(params[0]));
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -125,6 +149,7 @@ public class HubConnection {
callback.invoke(param1.cast(params[0]), param2.cast(params[1]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -134,6 +159,7 @@ public class HubConnection {
callback.invoke(param1.cast(params[0]), param2.cast(params[1]), param3.cast(params[2]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -143,6 +169,7 @@ public class HubConnection {
callback.invoke(param1.cast(params[0]), param2.cast(params[1]), param3.cast(params[2]), param4.cast(params[3]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -153,6 +180,7 @@ public class HubConnection {
param5.cast(params[4]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -163,6 +191,7 @@ public class HubConnection {
param5.cast(params[4]) ,param6.cast(params[5]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -173,6 +202,7 @@ public class HubConnection {
param5.cast(params[4]) ,param6.cast(params[5]), param7.cast(params[6]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
@ -183,10 +213,12 @@ public class HubConnection {
param5.cast(params[4]) ,param6.cast(params[5]), param7.cast(params[6]), param8.cast(params[7]));
};
handlers.put(target, action);
logger.log(LogLevel.Trace, "Registering handler for client method: %s", target);
return new Subscription(handlers, action, target);
}
public void remove(String name) {
handlers.remove(name);
logger.log(LogLevel.Trace, "Removing handlers for client method %s" , name);
}
}

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
public enum LogLevel {
Trace(0),
Debug(1),
Information(2),
Warning(3),
Error(4),
Critical(5),
None(6);
public int value;
LogLevel(int id) { this.value = id; }
}

View File

@ -0,0 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
public interface Logger {
void log(LogLevel logLevel, String message);
void log(LogLevel logLevel, String formattedMessage, Object ... args);
}

View File

@ -0,0 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
public class NullLogger implements Logger {
@Override
public void log(LogLevel logLevel, String message) { }
@Override
public void log(LogLevel logLevel, String formattedMessage, Object... args) { }
}

View File

@ -11,14 +11,16 @@ public class WebSocketTransport implements Transport {
private WebSocketClient webSocketClient;
private OnReceiveCallBack onReceiveCallBack;
private URI url;
private Logger logger;
private static final String HTTP = "http";
private static final String HTTPS = "https";
private static final String WS = "ws";
private static final String WSS = "wss";
public WebSocketTransport(String url) throws URISyntaxException {
public WebSocketTransport(String url, Logger logger) throws URISyntaxException {
this.url = formatUrl(url);
this.logger = logger;
}
public URI getUrl(){
@ -37,8 +39,10 @@ public class WebSocketTransport implements Transport {
@Override
public void start() throws InterruptedException {
logger.log(LogLevel.Debug, "Starting Websocket connection");
webSocketClient = createWebSocket();
webSocketClient.connectBlocking();
logger.log(LogLevel.Information, "WebSocket transport connected to: %s", webSocketClient.getURI());
}
@Override
@ -49,6 +53,7 @@ public class WebSocketTransport implements Transport {
@Override
public void setOnReceive(OnReceiveCallBack callback) {
this.onReceiveCallBack = callback;
logger.log(LogLevel.Debug, "OnReceived callback has been set");
}
@Override
@ -59,6 +64,7 @@ public class WebSocketTransport implements Transport {
@Override
public void stop() {
webSocketClient.closeConnection(0, "HubConnection Stopped");
logger.log(LogLevel.Information, "WebSocket connection stopped");
}
private WebSocketClient createWebSocket() {

View File

@ -32,7 +32,7 @@ public class WebSocketTransportTest {
@Test
public void checkWebsocketUrlProtocol() throws URISyntaxException {
WebSocketTransport webSocketTransport = new WebSocketTransport(this.url);
WebSocketTransport webSocketTransport = new WebSocketTransport(this.url, new NullLogger());
assertEquals(this.expectedUrl, webSocketTransport.getUrl().toString());
}
}

View File

@ -14,6 +14,7 @@ using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Client.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
@ -52,6 +53,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
private long _nextActivationServerTimeout;
private long _nextActivationSendPing;
private bool _disposed;
private bool _hasInherentKeepAlive;
private CancellationToken _uploadStreamToken;
@ -310,6 +312,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
// Start the connection
var connection = await _connectionFactory.ConnectAsync(_protocol.TransferFormat);
var startingConnectionState = new ConnectionState(connection, this);
_hasInherentKeepAlive = connection.Features.Get<IConnectionInherentKeepAliveFeature>()?.HasInherentKeepAlive ?? false;
// From here on, if an error occurs we need to shut down the connection because
// we still own it.
@ -991,19 +994,25 @@ namespace Microsoft.AspNetCore.SignalR.Client
// await returns True until `timer.Stop()` is called in the `finally` block of `ReceiveLoop`
while (await timer)
{
if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationServerTimeout))
{
OnServerTimeout();
}
if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationSendPing))
{
await PingServer();
}
await RunTimerActions();
}
}
}
// Internal for testing
internal async Task RunTimerActions()
{
if (!_hasInherentKeepAlive && DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationServerTimeout))
{
OnServerTimeout();
}
if (DateTime.UtcNow.Ticks > Volatile.Read(ref _nextActivationSendPing))
{
await PingServer();
}
}
private void OnServerTimeout()
{
_connectionState.CloseException = new TimeoutException(

View File

@ -8,6 +8,7 @@ using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.DependencyInjection;
@ -122,6 +123,33 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
Assert.Equal($"Server timeout ({hubConnection.ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server.", exception.Message);
}
[Fact]
public async Task ServerTimeoutIsDisabledWhenUsingTransportWithInherentKeepAlive()
{
using (StartVerifiableLog(out var loggerFactory))
{
var testConnection = new TestConnection();
testConnection.Features.Set<IConnectionInherentKeepAliveFeature>(new TestKeepAliveFeature() { HasInherentKeepAlive = true });
var hubConnection = CreateHubConnection(testConnection, loggerFactory: loggerFactory);
hubConnection.ServerTimeout = TimeSpan.FromMilliseconds(1);
await hubConnection.StartAsync().OrTimeout();
var closeTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += ex =>
{
closeTcs.TrySetResult(ex);
return Task.CompletedTask;
};
await hubConnection.RunTimerActions().OrTimeout();
Assert.False(closeTcs.Task.IsCompleted);
await hubConnection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task PendingInvocationsAreTerminatedIfServerTimeoutIntervalElapsesWithNoMessages()
{
@ -368,6 +396,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public int Bar { get; private set; }
}
private struct TestKeepAliveFeature : IConnectionInherentKeepAliveFeature
{
public bool HasInherentKeepAlive { get; set; }
}
// Moq really doesn't handle out parameters well, so to make these tests work I added a manual mock -anurse
private class MockHubProtocol : IHubProtocol