Merge pull request #3063 from dotnet-maestro-bot/merge/release/2.2-to-master
[automated] Merge branch 'release/2.2' => 'master'
This commit is contained in:
commit
4aad2c4802
|
|
@ -186,6 +186,7 @@ public class HubConnection {
|
||||||
return CompletableFuture.completedFuture(null);
|
return CompletableFuture.completedFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handshakeReceived = false;
|
||||||
CompletableFuture<Void> tokenFuture = accessTokenProvider.get()
|
CompletableFuture<Void> tokenFuture = accessTokenProvider.get()
|
||||||
.thenAccept((token) -> {
|
.thenAccept((token) -> {
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
|
|
@ -203,13 +204,13 @@ public class HubConnection {
|
||||||
return negotiate.thenCompose((url) -> {
|
return negotiate.thenCompose((url) -> {
|
||||||
logger.log(LogLevel.Debug, "Starting HubConnection.");
|
logger.log(LogLevel.Debug, "Starting HubConnection.");
|
||||||
if (transport == null) {
|
if (transport == null) {
|
||||||
transport = new WebSocketTransport(url, headers, httpClient, logger);
|
transport = new WebSocketTransport(headers, httpClient, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
transport.setOnReceive(this.callback);
|
transport.setOnReceive(this.callback);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return transport.start().thenCompose((future) -> {
|
return transport.start(url).thenCompose((future) -> {
|
||||||
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
|
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
|
||||||
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
|
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
|
||||||
return transport.send(handshake).thenRun(() -> {
|
return transport.send(handshake).thenRun(() -> {
|
||||||
|
|
@ -289,8 +290,6 @@ public class HubConnection {
|
||||||
HubException hubException = null;
|
HubException hubException = null;
|
||||||
hubConnectionStateLock.lock();
|
hubConnectionStateLock.lock();
|
||||||
try {
|
try {
|
||||||
hubConnectionState = HubConnectionState.DISCONNECTED;
|
|
||||||
|
|
||||||
if (errorMessage != null) {
|
if (errorMessage != null) {
|
||||||
hubException = new HubException(errorMessage);
|
hubException = new HubException(errorMessage);
|
||||||
} else if (t != null) {
|
} else if (t != null) {
|
||||||
|
|
@ -299,6 +298,7 @@ public class HubConnection {
|
||||||
connectionState.cancelOutstandingInvocations(hubException);
|
connectionState.cancelOutstandingInvocations(hubException);
|
||||||
connectionState = null;
|
connectionState = null;
|
||||||
logger.log(LogLevel.Information, "HubConnection stopped.");
|
logger.log(LogLevel.Information, "HubConnection stopped.");
|
||||||
|
hubConnectionState = HubConnectionState.DISCONNECTED;
|
||||||
} finally {
|
} finally {
|
||||||
hubConnectionStateLock.unlock();
|
hubConnectionStateLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,10 @@ class JsonHubProtocol implements HubProtocol {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HubMessage[] parseMessages(String payload, InvocationBinder binder) throws Exception {
|
public HubMessage[] parseMessages(String payload, InvocationBinder binder) throws Exception {
|
||||||
|
if (payload != null && !payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR)) {
|
||||||
|
throw new RuntimeException("Message is incomplete.");
|
||||||
|
}
|
||||||
|
|
||||||
String[] messages = payload.split(RECORD_SEPARATOR);
|
String[] messages = payload.split(RECORD_SEPARATOR);
|
||||||
List<HubMessage> hubMessages = new ArrayList<>();
|
List<HubMessage> hubMessages = new ArrayList<>();
|
||||||
for (String str : messages) {
|
for (String str : messages) {
|
||||||
|
|
@ -48,7 +52,6 @@ class JsonHubProtocol implements HubProtocol {
|
||||||
JsonArray argumentsToken = null;
|
JsonArray argumentsToken = null;
|
||||||
Object result = null;
|
Object result = null;
|
||||||
JsonElement resultToken = null;
|
JsonElement resultToken = null;
|
||||||
|
|
||||||
JsonReader reader = new JsonReader(new StringReader(str));
|
JsonReader reader = new JsonReader(new StringReader(str));
|
||||||
reader.beginObject();
|
reader.beginObject();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package com.microsoft.aspnet.signalr;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
interface Transport {
|
interface Transport {
|
||||||
CompletableFuture<Void> start() throws Exception;
|
CompletableFuture<Void> start(String url);
|
||||||
CompletableFuture<Void> send(String message);
|
CompletableFuture<Void> send(String message);
|
||||||
void setOnReceive(OnReceiveCallBack callback);
|
void setOnReceive(OnReceiveCallBack callback);
|
||||||
void onReceive(String message) throws Exception;
|
void onReceive(String message) throws Exception;
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,7 @@ class WebSocketTransport implements Transport {
|
||||||
private static final String WS = "ws";
|
private static final String WS = "ws";
|
||||||
private static final String WSS = "wss";
|
private static final String WSS = "wss";
|
||||||
|
|
||||||
public WebSocketTransport(String url, Map<String, String> headers, HttpClient client, Logger logger) {
|
public WebSocketTransport(Map<String, String> headers, HttpClient client, Logger logger) {
|
||||||
this.url = formatUrl(url);
|
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.headers = headers;
|
this.headers = headers;
|
||||||
|
|
@ -41,7 +40,8 @@ class WebSocketTransport implements Transport {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> start() {
|
public CompletableFuture<Void> start(String url) {
|
||||||
|
this.url = formatUrl(url);
|
||||||
logger.log(LogLevel.Debug, "Starting Websocket connection.");
|
logger.log(LogLevel.Debug, "Starting Websocket connection.");
|
||||||
this.webSocketClient = client.createWebSocket(this.url, this.headers);
|
this.webSocketClient = client.createWebSocket(this.url, this.headers);
|
||||||
this.webSocketClient.setOnReceive((message) -> onReceive(message));
|
this.webSocketClient.setOnReceive((message) -> onReceive(message));
|
||||||
|
|
|
||||||
|
|
@ -1019,6 +1019,7 @@ class HubConnectionTest {
|
||||||
|
|
||||||
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
|
hubConnection.start().get(1000, TimeUnit.MILLISECONDS);
|
||||||
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
|
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
|
||||||
|
assertEquals("http://testexample.com/?id=bVOiRPG8-6YiJ6d7ZcTOVQ", transport.getUrl());
|
||||||
hubConnection.stop();
|
hubConnection.stop();
|
||||||
assertEquals("Bearer newToken", token.get());
|
assertEquals("Bearer newToken", token.get());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -246,6 +246,16 @@ class JsonHubProtocolTest {
|
||||||
assertEquals("Invocation provides 1 argument(s) but target expects 2.", exception.getMessage());
|
assertEquals("Invocation provides 1 argument(s) but target expects 2.", exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void errorWhileParsingIncompleteMessage() throws Exception {
|
||||||
|
String stringifiedMessage = "{\"type\":1,\"target\":\"test\",\"arguments\":";
|
||||||
|
TestBinder binder = new TestBinder(new InvocationMessage(null, "test", new Object[] { 42, 24 }));
|
||||||
|
|
||||||
|
RuntimeException exception = assertThrows(RuntimeException.class,
|
||||||
|
() -> jsonHubProtocol.parseMessages(stringifiedMessage, binder));
|
||||||
|
assertEquals("Message is incomplete.", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
private class TestBinder implements InvocationBinder {
|
private class TestBinder implements InvocationBinder {
|
||||||
private Class<?>[] paramTypes = null;
|
private Class<?>[] paramTypes = null;
|
||||||
private Class<?> returnType = null;
|
private Class<?> returnType = null;
|
||||||
|
|
|
||||||
|
|
@ -9,9 +9,11 @@ import java.util.concurrent.CompletableFuture;
|
||||||
class MockTransport implements Transport {
|
class MockTransport implements Transport {
|
||||||
private OnReceiveCallBack onReceiveCallBack;
|
private OnReceiveCallBack onReceiveCallBack;
|
||||||
private ArrayList<String> sentMessages = new ArrayList<>();
|
private ArrayList<String> sentMessages = new ArrayList<>();
|
||||||
|
private String url;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture start() {
|
public CompletableFuture start(String url) {
|
||||||
|
this.url = url;
|
||||||
return CompletableFuture.completedFuture(null);
|
return CompletableFuture.completedFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,4 +45,8 @@ class MockTransport implements Transport {
|
||||||
public String[] getSentMessages() {
|
public String[] getSentMessages() {
|
||||||
return sentMessages.toArray(new String[sentMessages.size()]);
|
return sentMessages.toArray(new String[sentMessages.size()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getUrl() {
|
||||||
|
return this.url;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,8 +14,8 @@ import org.junit.jupiter.api.Test;
|
||||||
class WebSocketTransportTest {
|
class WebSocketTransportTest {
|
||||||
@Test
|
@Test
|
||||||
public void WebsocketThrowsIfItCantConnect() throws Exception {
|
public void WebsocketThrowsIfItCantConnect() throws Exception {
|
||||||
Transport transport = new WebSocketTransport("http://www.notarealurl12345.fake", new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
|
Transport transport = new WebSocketTransport(new HashMap<>(), new DefaultHttpClient(new NullLogger()), new NullLogger());
|
||||||
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start().get(1, TimeUnit.SECONDS));
|
ExecutionException exception = assertThrows(ExecutionException.class, () -> transport.start("http://www.example.com").get(1, TimeUnit.SECONDS));
|
||||||
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());
|
assertEquals("There was an error starting the Websockets transport.", exception.getCause().getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,8 +24,11 @@ class WebSocketTransportUrlFormatTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("protocols")
|
@MethodSource("protocols")
|
||||||
public void checkWebsocketUrlProtocol(String url, String expectedUrl) throws URISyntaxException {
|
public void checkWebsocketUrlProtocol(String url, String expectedUrl) {
|
||||||
WebSocketTransport webSocketTransport = new WebSocketTransport(url, new HashMap<>(), new TestHttpClient(), new NullLogger());
|
WebSocketTransport webSocketTransport = new WebSocketTransport(new HashMap<>(), new TestHttpClient(), new NullLogger());
|
||||||
|
try {
|
||||||
|
webSocketTransport.start(url);
|
||||||
|
} catch (Exception e) {}
|
||||||
assertEquals(expectedUrl, webSocketTransport.getUrl());
|
assertEquals(expectedUrl, webSocketTransport.getUrl());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -92,7 +92,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[ConditionalTheory]
|
[ConditionalTheory(Skip= "https://github.com/aspnet/SignalR/issues/3058")]
|
||||||
[SkipIfDockerNotPresent]
|
[SkipIfDockerNotPresent]
|
||||||
[MemberData(nameof(TransportTypesAndProtocolTypes))]
|
[MemberData(nameof(TransportTypesAndProtocolTypes))]
|
||||||
public async Task CanSendAndReceiveUserMessagesFromMultipleConnectionsWithSameUser(HttpTransportType transportType, string protocolName)
|
public async Task CanSendAndReceiveUserMessagesFromMultipleConnectionsWithSameUser(HttpTransportType transportType, string protocolName)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue