From 4dfd93c1d7d1fbbaa58676d79adf0cfab3a0ca59 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 20 Sep 2018 14:23:43 -0700 Subject: [PATCH] [TS] Wait for handshake response (#2983) --- clients/ts/signalr/src/HandshakeProtocol.ts | 6 +- clients/ts/signalr/src/HubConnection.ts | 13 ++ .../ts/signalr/tests/HubConnection.test.ts | 143 ++++++++++-------- .../tests/HubConnectionBuilder.test.ts | 10 +- .../Protocol/HandshakeProtocol.cs | 2 +- 5 files changed, 105 insertions(+), 69 deletions(-) diff --git a/clients/ts/signalr/src/HandshakeProtocol.ts b/clients/ts/signalr/src/HandshakeProtocol.ts index 2b13b9870a..f5f8b47fc6 100644 --- a/clients/ts/signalr/src/HandshakeProtocol.ts +++ b/clients/ts/signalr/src/HandshakeProtocol.ts @@ -55,7 +55,11 @@ export class HandshakeProtocol { // At this point we should have just the single handshake message const messages = TextMessageFormat.parse(messageData); - responseMessage = JSON.parse(messages[0]); + const response = JSON.parse(messages[0]); + if (response.type) { + throw new Error("Expected a handshake response from the server."); + } + responseMessage = response; // multiple messages could have arrived with handshake // return additional data to be parsed as usual, or null if all parsed diff --git a/clients/ts/signalr/src/HubConnection.ts b/clients/ts/signalr/src/HubConnection.ts index 1810bc6da4..2160f21144 100644 --- a/clients/ts/signalr/src/HubConnection.ts +++ b/clients/ts/signalr/src/HubConnection.ts @@ -31,6 +31,8 @@ export class HubConnection { private id: number; private closedCallbacks: Array<(error?: Error) => void>; private receivedHandshakeResponse: boolean; + private handshakeResolver!: (value?: PromiseLike<{}>) => void; + private handshakeRejecter!: (reason?: any) => void; private connectionState: HubConnectionState; // The type of these a) doesn't matter and b) varies when building in browser and node contexts @@ -106,6 +108,11 @@ export class HubConnection { this.logger.log(LogLevel.Debug, "Starting HubConnection."); this.receivedHandshakeResponse = false; + // Set up the promise before any connection is started otherwise it could race with received messages + const handshakePromise = new Promise((resolve, reject) => { + this.handshakeResolver = resolve; + this.handshakeRejecter = reject; + }); await this.connection.start(this.protocol.transferFormat); @@ -120,6 +127,8 @@ export class HubConnection { this.resetTimeoutPeriod(); this.resetKeepAliveInterval(); + // Wait for the handshake to complete before marking connection as connected + await handshakePromise; this.connectionState = HubConnectionState.Connected; } @@ -388,19 +397,23 @@ export class HubConnection { // We don't want to wait on the stop itself. // tslint:disable-next-line:no-floating-promises this.connection.stop(error); + this.handshakeRejecter(error); throw error; } if (responseMessage.error) { const message = "Server returned handshake error: " + responseMessage.error; this.logger.log(LogLevel.Error, message); + this.handshakeRejecter(message); // We don't want to wait on the stop itself. // tslint:disable-next-line:no-floating-promises this.connection.stop(new Error(message)); + throw new Error(message); } else { this.logger.log(LogLevel.Debug, "Server handshake complete."); } + this.handshakeResolver(); return remainingData; } diff --git a/clients/ts/signalr/tests/HubConnection.test.ts b/clients/ts/signalr/tests/HubConnection.test.ts index cbd0a5fddd..6945b88dde 100644 --- a/clients/ts/signalr/tests/HubConnection.test.ts +++ b/clients/ts/signalr/tests/HubConnection.test.ts @@ -23,7 +23,7 @@ registerUnhandledRejectionHandler(); describe("HubConnection", () => { describe("start", () => { - it("sends negotiation message", async () => { + it("sends handshake message", async () => { await VerifyLogger.run(async (logger) => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); @@ -163,12 +163,16 @@ describe("HubConnection", () => { protocolCalled = true; }; - const connection = new TestConnection(); + const connection = new TestConnection(false); const hubConnection = createHubConnection(connection, logger, mockProtocol); try { + let startCompleted = false; + const startPromise = hubConnection.start().then(() => startCompleted = true); const data = "{}" + TextMessageFormat.RecordSeparator; + expect(startCompleted).toBe(false); connection.receiveText(data); + await startPromise; // message only contained handshake response expect(protocolCalled).toEqual(false); @@ -187,13 +191,18 @@ describe("HubConnection", () => { protocolCalled = true; }; - const connection = new TestConnection(); + const connection = new TestConnection(false); const hubConnection = createHubConnection(connection, logger, mockProtocol); try { + let startCompleted = false; + const startPromise = hubConnection.start().then(() => startCompleted = true); + expect(startCompleted).toBe(false); + // handshake response + message separator const data = [0x7b, 0x7d, 0x1e]; connection.receiveBinary(new Uint8Array(data).buffer); + await startPromise; // message only contained handshake response expect(protocolCalled).toEqual(false); @@ -210,9 +219,13 @@ describe("HubConnection", () => { const mockProtocol = new TestProtocol(TransferFormat.Binary); mockProtocol.onreceive = (d) => receivedProcotolData = d as ArrayBuffer; - const connection = new TestConnection(); + const connection = new TestConnection(false); const hubConnection = createHubConnection(connection, logger, mockProtocol); try { + let startCompleted = false; + const startPromise = hubConnection.start().then(() => startCompleted = true); + expect(startCompleted).toBe(false); + // handshake response + message separator + message pack message const data = [ 0x7b, 0x7d, 0x1e, 0x65, 0x95, 0x03, 0x80, 0xa1, 0x30, 0x01, 0xd9, 0x5d, 0x54, 0x68, 0x65, 0x20, 0x63, 0x6c, @@ -224,6 +237,7 @@ describe("HubConnection", () => { ]; connection.receiveBinary(new Uint8Array(data).buffer); + await startPromise; // left over data is the message pack message expect(receivedProcotolData!.byteLength).toEqual(102); @@ -240,12 +254,17 @@ describe("HubConnection", () => { const mockProtocol = new TestProtocol(TransferFormat.Text); mockProtocol.onreceive = (d) => receivedProcotolData = d as string; - const connection = new TestConnection(); + const connection = new TestConnection(false); const hubConnection = createHubConnection(connection, logger, mockProtocol); try { + let startCompleted = false; + const startPromise = hubConnection.start().then(() => startCompleted = true); + expect(startCompleted).toBe(false); + const data = "{}" + TextMessageFormat.RecordSeparator + "{\"type\":6}" + TextMessageFormat.RecordSeparator; connection.receiveText(data); + await startPromise; expect(receivedProcotolData).toEqual("{\"type\":6}" + TextMessageFormat.RecordSeparator); } finally { @@ -259,7 +278,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const invokePromise = hubConnection.invoke("testMethod", "arg", 42); @@ -277,7 +296,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const invokePromise = hubConnection.invoke("testMethod", "arg", 42); @@ -296,7 +315,7 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); - connection.receiveHandshakeResponse(); + await hubConnection.start(); const invokePromise = hubConnection.invoke("testMethod"); await hubConnection.stop(); @@ -311,7 +330,7 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const invokePromise = hubConnection.invoke("testMethod"); // Typically this would be called by the transport @@ -340,7 +359,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, wrappingLogger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); connection.receive({ arguments: ["test"], @@ -370,7 +389,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, wrappingLogger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const handler = () => { }; hubConnection.on("message", handler); @@ -396,7 +415,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); let count = 0; const handler = () => { count++; }; @@ -432,7 +451,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); let count = 0; const handler = () => { count++; }; @@ -468,7 +487,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); let count = 0; const handler = () => { count++; }; @@ -494,7 +513,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); let value = ""; hubConnection.on("message", (v) => value = v); @@ -515,13 +534,22 @@ describe("HubConnection", () => { it("stop on handshake error", async () => { await VerifyLogger.run(async (logger) => { - const connection = new TestConnection(); + const connection = new TestConnection(false); const hubConnection = createHubConnection(connection, logger); try { let closeError: Error | undefined; hubConnection.onclose((e) => closeError = e); - connection.receiveHandshakeResponse("Error!"); + let startCompleted = false; + const startPromise = hubConnection.start().then(() => startCompleted = true); + expect(startCompleted).toBe(false); + try { + connection.receiveHandshakeResponse("Error!"); + } catch { + } + await expect(startPromise) + .rejects + .toThrow("Server returned handshake error: Error!"); expect(closeError!.message).toEqual("Server returned handshake error: Error!"); } finally { @@ -543,7 +571,7 @@ describe("HubConnection", () => { closeError = e; }); - connection.receiveHandshakeResponse(); + await hubConnection.start(); connection.receive({ type: MessageType.Close, @@ -569,7 +597,7 @@ describe("HubConnection", () => { closeError = e; }); - connection.receiveHandshakeResponse(); + await hubConnection.start(); connection.receive({ error: "Error!", @@ -589,7 +617,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); let numInvocations1 = 0; let numInvocations2 = 0; @@ -616,7 +644,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); let numInvocations = 0; const callback = () => numInvocations++; @@ -674,7 +702,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, wrappingLogger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); hubConnection.on(null!, undefined!); hubConnection.on(undefined!, null!); @@ -714,11 +742,13 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); try { + await hubConnection.start(); + hubConnection.stream("testStream", "arg", 42); - // Verify the message is sent - expect(connection.sentData.length).toBe(1); - expect(JSON.parse(connection.sentData[0])).toEqual({ + // Verify the message is sent (+ handshake) + expect(connection.sentData.length).toBe(2); + expect(JSON.parse(connection.sentData[1])).toEqual({ arguments: [ "arg", 42, @@ -741,7 +771,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const observer = new TestObserver(); hubConnection.stream("testMethod", "arg", 42) @@ -761,7 +791,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const observer = new TestObserver(); hubConnection.stream("testMethod", "arg", 42) @@ -782,6 +812,8 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); try { + await hubConnection.start(); + const observer = new TestObserver(); hubConnection.stream("testMethod") .subscribe(observer); @@ -800,6 +832,8 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); try { + await hubConnection.start(); + const observer = new TestObserver(); hubConnection.stream("testMethod") .subscribe(observer); @@ -819,7 +853,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const observer = new TestObserver(); hubConnection.stream("testMethod") @@ -848,6 +882,7 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); try { + await hubConnection.start(); hubConnection.stream("testMethod").subscribe(NullSubscriber.instance); // Typically this would be called by the transport @@ -865,6 +900,7 @@ describe("HubConnection", () => { const hubConnection = createHubConnection(connection, logger); try { + await hubConnection.start(); hubConnection.stream("testMethod").subscribe(NullSubscriber.instance); // Send completion to trigger observer.complete() @@ -881,7 +917,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - connection.receiveHandshakeResponse(); + await hubConnection.start(); const observer = new TestObserver(); const subscription = hubConnection.stream("testMethod") @@ -896,9 +932,9 @@ describe("HubConnection", () => { // Observer should no longer receive messages expect(observer.itemsReceived).toEqual([1]); - // Verify the cancel is sent - expect(connection.sentData.length).toBe(2); - expect(JSON.parse(connection.sentData[1])).toEqual({ + // Verify the cancel is sent (+ handshake) + expect(connection.sentData.length).toBe(3); + expect(JSON.parse(connection.sentData[2])).toEqual({ invocationId: connection.lastInvocationId, type: MessageType.CancelInvocation, }); @@ -986,6 +1022,7 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { + await hubConnection.start(); const invokePromise = hubConnection.invoke("testMethod", "arg", 42); connection.receive({ type: MessageType.Ping }); @@ -1025,37 +1062,6 @@ describe("HubConnection", () => { }); }); - it("does not timeout if message was received before HubConnection.start", async () => { - await VerifyLogger.run(async (logger) => { - const connection = new TestConnection(); - const hubConnection = createHubConnection(connection, logger); - try { - hubConnection.serverTimeoutInMilliseconds = 200; - - const p = new PromiseSource(); - hubConnection.onclose((e) => p.resolve(e)); - - // send message before start to trigger timeout handler - // testing for regression where we didn't cleanup timer if request received before start created a timer - await connection.receive({ type: MessageType.Ping }); - - await hubConnection.start(); - - for (let i = 0; i < 6; i++) { - await pingAndWait(connection); - } - - await connection.stop(); - - const error = await p.promise; - - expect(error).toBeUndefined(); - } finally { - await hubConnection.stop(); - } - }); - }); - it("terminates if no messages received within timeout interval", async () => { await VerifyLogger.run(async (logger) => { const connection = new TestConnection(); @@ -1092,11 +1098,14 @@ class TestConnection implements IConnection { public sentData: any[]; public lastInvocationId: string | null; - constructor() { + private autoHandshake: boolean | null; + + constructor(autoHandshake: boolean = true) { this.onreceive = null; this.onclose = null; this.sentData = []; this.lastInvocationId = null; + this.autoHandshake = autoHandshake; } public start(): Promise { @@ -1105,7 +1114,11 @@ class TestConnection implements IConnection { public send(data: any): Promise { const invocation = TextMessageFormat.parse(data)[0]; - const invocationId = JSON.parse(invocation).invocationId; + const parsedInvocation = JSON.parse(invocation); + const invocationId = parsedInvocation.invocationId; + if (parsedInvocation.protocol && parsedInvocation.version && this.autoHandshake) { + this.receiveHandshakeResponse(); + } if (invocationId) { this.lastInvocationId = invocationId; } diff --git a/clients/ts/signalr/tests/HubConnectionBuilder.test.ts b/clients/ts/signalr/tests/HubConnectionBuilder.test.ts index 293c896a54..d2361fa41a 100644 --- a/clients/ts/signalr/tests/HubConnectionBuilder.test.ts +++ b/clients/ts/signalr/tests/HubConnectionBuilder.test.ts @@ -65,7 +65,10 @@ describe("HubConnectionBuilder", () => { // Start the connection const closed = makeClosedPromise(connection); - await connection.start(); + + // start waits for handshake before returning, we don't care in this test + // tslint:disable-next-line:no-floating-promises + connection.start(); const pollRequest = await pollSent.promise; expect(pollRequest.url).toMatch(/http:\/\/example.com\?id=abc123.*/); @@ -109,7 +112,10 @@ describe("HubConnectionBuilder", () => { // Start the connection const closed = makeClosedPromise(connection); - await connection.start(); + + // start waits for handshake before returning, we don't care in this test + // tslint:disable-next-line:no-floating-promises + connection.start(); const negotiateRequest = await negotiateReceived.promise; expect(negotiateRequest.content).toBe(`{"protocol":"${protocol.name}","version":1}\x1E`); diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Protocol/HandshakeProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Protocol/HandshakeProtocol.cs index 3c5294f286..d0c8260a9e 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Protocol/HandshakeProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Protocol/HandshakeProtocol.cs @@ -138,7 +138,7 @@ namespace Microsoft.AspNetCore.SignalR.Protocol case TypePropertyName: // a handshake response does not have a type // check the incoming message was not any other type of message - throw new InvalidDataException("Handshake response should not have a 'type' value."); + throw new InvalidDataException("Expected a handshake response from the server."); case ErrorPropertyName: error = JsonUtils.ReadAsString(reader, ErrorPropertyName); break;