[TS] Wait for handshake response (#2983)

This commit is contained in:
BrennanConroy 2018-09-20 14:23:43 -07:00 committed by GitHub
parent f0a34a4ca4
commit 4dfd93c1d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 69 deletions

View File

@ -55,7 +55,11 @@ export class HandshakeProtocol {
// At this point we should have just the single handshake message
const messages = TextMessageFormat.parse(messageData);
responseMessage = JSON.parse(messages[0]);
const response = JSON.parse(messages[0]);
if (response.type) {
throw new Error("Expected a handshake response from the server.");
}
responseMessage = response;
// multiple messages could have arrived with handshake
// return additional data to be parsed as usual, or null if all parsed

View File

@ -31,6 +31,8 @@ export class HubConnection {
private id: number;
private closedCallbacks: Array<(error?: Error) => void>;
private receivedHandshakeResponse: boolean;
private handshakeResolver!: (value?: PromiseLike<{}>) => void;
private handshakeRejecter!: (reason?: any) => void;
private connectionState: HubConnectionState;
// The type of these a) doesn't matter and b) varies when building in browser and node contexts
@ -106,6 +108,11 @@ export class HubConnection {
this.logger.log(LogLevel.Debug, "Starting HubConnection.");
this.receivedHandshakeResponse = false;
// Set up the promise before any connection is started otherwise it could race with received messages
const handshakePromise = new Promise((resolve, reject) => {
this.handshakeResolver = resolve;
this.handshakeRejecter = reject;
});
await this.connection.start(this.protocol.transferFormat);
@ -120,6 +127,8 @@ export class HubConnection {
this.resetTimeoutPeriod();
this.resetKeepAliveInterval();
// Wait for the handshake to complete before marking connection as connected
await handshakePromise;
this.connectionState = HubConnectionState.Connected;
}
@ -388,19 +397,23 @@ export class HubConnection {
// We don't want to wait on the stop itself.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(error);
this.handshakeRejecter(error);
throw error;
}
if (responseMessage.error) {
const message = "Server returned handshake error: " + responseMessage.error;
this.logger.log(LogLevel.Error, message);
this.handshakeRejecter(message);
// We don't want to wait on the stop itself.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(new Error(message));
throw new Error(message);
} else {
this.logger.log(LogLevel.Debug, "Server handshake complete.");
}
this.handshakeResolver();
return remainingData;
}

View File

@ -23,7 +23,7 @@ registerUnhandledRejectionHandler();
describe("HubConnection", () => {
describe("start", () => {
it("sends negotiation message", async () => {
it("sends handshake message", async () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
@ -163,12 +163,16 @@ describe("HubConnection", () => {
protocolCalled = true;
};
const connection = new TestConnection();
const connection = new TestConnection(false);
const hubConnection = createHubConnection(connection, logger, mockProtocol);
try {
let startCompleted = false;
const startPromise = hubConnection.start().then(() => startCompleted = true);
const data = "{}" + TextMessageFormat.RecordSeparator;
expect(startCompleted).toBe(false);
connection.receiveText(data);
await startPromise;
// message only contained handshake response
expect(protocolCalled).toEqual(false);
@ -187,13 +191,18 @@ describe("HubConnection", () => {
protocolCalled = true;
};
const connection = new TestConnection();
const connection = new TestConnection(false);
const hubConnection = createHubConnection(connection, logger, mockProtocol);
try {
let startCompleted = false;
const startPromise = hubConnection.start().then(() => startCompleted = true);
expect(startCompleted).toBe(false);
// handshake response + message separator
const data = [0x7b, 0x7d, 0x1e];
connection.receiveBinary(new Uint8Array(data).buffer);
await startPromise;
// message only contained handshake response
expect(protocolCalled).toEqual(false);
@ -210,9 +219,13 @@ describe("HubConnection", () => {
const mockProtocol = new TestProtocol(TransferFormat.Binary);
mockProtocol.onreceive = (d) => receivedProcotolData = d as ArrayBuffer;
const connection = new TestConnection();
const connection = new TestConnection(false);
const hubConnection = createHubConnection(connection, logger, mockProtocol);
try {
let startCompleted = false;
const startPromise = hubConnection.start().then(() => startCompleted = true);
expect(startCompleted).toBe(false);
// handshake response + message separator + message pack message
const data = [
0x7b, 0x7d, 0x1e, 0x65, 0x95, 0x03, 0x80, 0xa1, 0x30, 0x01, 0xd9, 0x5d, 0x54, 0x68, 0x65, 0x20, 0x63, 0x6c,
@ -224,6 +237,7 @@ describe("HubConnection", () => {
];
connection.receiveBinary(new Uint8Array(data).buffer);
await startPromise;
// left over data is the message pack message
expect(receivedProcotolData!.byteLength).toEqual(102);
@ -240,12 +254,17 @@ describe("HubConnection", () => {
const mockProtocol = new TestProtocol(TransferFormat.Text);
mockProtocol.onreceive = (d) => receivedProcotolData = d as string;
const connection = new TestConnection();
const connection = new TestConnection(false);
const hubConnection = createHubConnection(connection, logger, mockProtocol);
try {
let startCompleted = false;
const startPromise = hubConnection.start().then(() => startCompleted = true);
expect(startCompleted).toBe(false);
const data = "{}" + TextMessageFormat.RecordSeparator + "{\"type\":6}" + TextMessageFormat.RecordSeparator;
connection.receiveText(data);
await startPromise;
expect(receivedProcotolData).toEqual("{\"type\":6}" + TextMessageFormat.RecordSeparator);
} finally {
@ -259,7 +278,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const invokePromise = hubConnection.invoke("testMethod", "arg", 42);
@ -277,7 +296,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const invokePromise = hubConnection.invoke("testMethod", "arg", 42);
@ -296,7 +315,7 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
connection.receiveHandshakeResponse();
await hubConnection.start();
const invokePromise = hubConnection.invoke("testMethod");
await hubConnection.stop();
@ -311,7 +330,7 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const invokePromise = hubConnection.invoke("testMethod");
// Typically this would be called by the transport
@ -340,7 +359,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, wrappingLogger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
connection.receive({
arguments: ["test"],
@ -370,7 +389,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, wrappingLogger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const handler = () => { };
hubConnection.on("message", handler);
@ -396,7 +415,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
let count = 0;
const handler = () => { count++; };
@ -432,7 +451,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
let count = 0;
const handler = () => { count++; };
@ -468,7 +487,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
let count = 0;
const handler = () => { count++; };
@ -494,7 +513,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
let value = "";
hubConnection.on("message", (v) => value = v);
@ -515,13 +534,22 @@ describe("HubConnection", () => {
it("stop on handshake error", async () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const connection = new TestConnection(false);
const hubConnection = createHubConnection(connection, logger);
try {
let closeError: Error | undefined;
hubConnection.onclose((e) => closeError = e);
connection.receiveHandshakeResponse("Error!");
let startCompleted = false;
const startPromise = hubConnection.start().then(() => startCompleted = true);
expect(startCompleted).toBe(false);
try {
connection.receiveHandshakeResponse("Error!");
} catch {
}
await expect(startPromise)
.rejects
.toThrow("Server returned handshake error: Error!");
expect(closeError!.message).toEqual("Server returned handshake error: Error!");
} finally {
@ -543,7 +571,7 @@ describe("HubConnection", () => {
closeError = e;
});
connection.receiveHandshakeResponse();
await hubConnection.start();
connection.receive({
type: MessageType.Close,
@ -569,7 +597,7 @@ describe("HubConnection", () => {
closeError = e;
});
connection.receiveHandshakeResponse();
await hubConnection.start();
connection.receive({
error: "Error!",
@ -589,7 +617,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
let numInvocations1 = 0;
let numInvocations2 = 0;
@ -616,7 +644,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
let numInvocations = 0;
const callback = () => numInvocations++;
@ -674,7 +702,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, wrappingLogger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
hubConnection.on(null!, undefined!);
hubConnection.on(undefined!, null!);
@ -714,11 +742,13 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
try {
await hubConnection.start();
hubConnection.stream("testStream", "arg", 42);
// Verify the message is sent
expect(connection.sentData.length).toBe(1);
expect(JSON.parse(connection.sentData[0])).toEqual({
// Verify the message is sent (+ handshake)
expect(connection.sentData.length).toBe(2);
expect(JSON.parse(connection.sentData[1])).toEqual({
arguments: [
"arg",
42,
@ -741,7 +771,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const observer = new TestObserver();
hubConnection.stream<any>("testMethod", "arg", 42)
@ -761,7 +791,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const observer = new TestObserver();
hubConnection.stream<any>("testMethod", "arg", 42)
@ -782,6 +812,8 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
try {
await hubConnection.start();
const observer = new TestObserver();
hubConnection.stream<any>("testMethod")
.subscribe(observer);
@ -800,6 +832,8 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
try {
await hubConnection.start();
const observer = new TestObserver();
hubConnection.stream<any>("testMethod")
.subscribe(observer);
@ -819,7 +853,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const observer = new TestObserver();
hubConnection.stream<any>("testMethod")
@ -848,6 +882,7 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
try {
await hubConnection.start();
hubConnection.stream("testMethod").subscribe(NullSubscriber.instance);
// Typically this would be called by the transport
@ -865,6 +900,7 @@ describe("HubConnection", () => {
const hubConnection = createHubConnection(connection, logger);
try {
await hubConnection.start();
hubConnection.stream("testMethod").subscribe(NullSubscriber.instance);
// Send completion to trigger observer.complete()
@ -881,7 +917,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
connection.receiveHandshakeResponse();
await hubConnection.start();
const observer = new TestObserver();
const subscription = hubConnection.stream("testMethod")
@ -896,9 +932,9 @@ describe("HubConnection", () => {
// Observer should no longer receive messages
expect(observer.itemsReceived).toEqual([1]);
// Verify the cancel is sent
expect(connection.sentData.length).toBe(2);
expect(JSON.parse(connection.sentData[1])).toEqual({
// Verify the cancel is sent (+ handshake)
expect(connection.sentData.length).toBe(3);
expect(JSON.parse(connection.sentData[2])).toEqual({
invocationId: connection.lastInvocationId,
type: MessageType.CancelInvocation,
});
@ -986,6 +1022,7 @@ describe("HubConnection", () => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
await hubConnection.start();
const invokePromise = hubConnection.invoke("testMethod", "arg", 42);
connection.receive({ type: MessageType.Ping });
@ -1025,37 +1062,6 @@ describe("HubConnection", () => {
});
});
it("does not timeout if message was received before HubConnection.start", async () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
try {
hubConnection.serverTimeoutInMilliseconds = 200;
const p = new PromiseSource<Error>();
hubConnection.onclose((e) => p.resolve(e));
// send message before start to trigger timeout handler
// testing for regression where we didn't cleanup timer if request received before start created a timer
await connection.receive({ type: MessageType.Ping });
await hubConnection.start();
for (let i = 0; i < 6; i++) {
await pingAndWait(connection);
}
await connection.stop();
const error = await p.promise;
expect(error).toBeUndefined();
} finally {
await hubConnection.stop();
}
});
});
it("terminates if no messages received within timeout interval", async () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
@ -1092,11 +1098,14 @@ class TestConnection implements IConnection {
public sentData: any[];
public lastInvocationId: string | null;
constructor() {
private autoHandshake: boolean | null;
constructor(autoHandshake: boolean = true) {
this.onreceive = null;
this.onclose = null;
this.sentData = [];
this.lastInvocationId = null;
this.autoHandshake = autoHandshake;
}
public start(): Promise<void> {
@ -1105,7 +1114,11 @@ class TestConnection implements IConnection {
public send(data: any): Promise<void> {
const invocation = TextMessageFormat.parse(data)[0];
const invocationId = JSON.parse(invocation).invocationId;
const parsedInvocation = JSON.parse(invocation);
const invocationId = parsedInvocation.invocationId;
if (parsedInvocation.protocol && parsedInvocation.version && this.autoHandshake) {
this.receiveHandshakeResponse();
}
if (invocationId) {
this.lastInvocationId = invocationId;
}

View File

@ -65,7 +65,10 @@ describe("HubConnectionBuilder", () => {
// Start the connection
const closed = makeClosedPromise(connection);
await connection.start();
// start waits for handshake before returning, we don't care in this test
// tslint:disable-next-line:no-floating-promises
connection.start();
const pollRequest = await pollSent.promise;
expect(pollRequest.url).toMatch(/http:\/\/example.com\?id=abc123.*/);
@ -109,7 +112,10 @@ describe("HubConnectionBuilder", () => {
// Start the connection
const closed = makeClosedPromise(connection);
await connection.start();
// start waits for handshake before returning, we don't care in this test
// tslint:disable-next-line:no-floating-promises
connection.start();
const negotiateRequest = await negotiateReceived.promise;
expect(negotiateRequest.content).toBe(`{"protocol":"${protocol.name}","version":1}\x1E`);

View File

@ -138,7 +138,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol
case TypePropertyName:
// a handshake response does not have a type
// check the incoming message was not any other type of message
throw new InvalidDataException("Handshake response should not have a 'type' value.");
throw new InvalidDataException("Expected a handshake response from the server.");
case ErrorPropertyName:
error = JsonUtils.ReadAsString(reader, ErrorPropertyName);
break;