diff --git a/src/SignalR/clients/ts/.vscode/launch.json b/src/SignalR/clients/ts/.vscode/launch.json new file mode 100644 index 0000000000..e12ee26bd7 --- /dev/null +++ b/src/SignalR/clients/ts/.vscode/launch.json @@ -0,0 +1,31 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "attach", + "name": "Node - Attach by Process ID", + "processId": "${command:PickProcess}" + }, + { + "type": "node", + "request": "launch", + "name": "Jest - All", + "program": "${workspaceFolder}/common/node_modules/jest/bin/jest", + "cwd": "${workspaceFolder}", + "args": ["--runInBand"], + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen" + }, + { + "type": "node", + "request": "launch", + "name": "Jest - Current File", + "program": "${workspaceFolder}/common/node_modules/jest/bin/jest", + "cwd": "${workspaceFolder}", + "args": ["${fileBasename}"], + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen" + } + ] +} diff --git a/src/SignalR/clients/ts/FunctionalTests/SignalR.Npm.FunctionalTests.npmproj b/src/SignalR/clients/ts/FunctionalTests/SignalR.Npm.FunctionalTests.npmproj index bff023ed16..bf021bb209 100644 --- a/src/SignalR/clients/ts/FunctionalTests/SignalR.Npm.FunctionalTests.npmproj +++ b/src/SignalR/clients/ts/FunctionalTests/SignalR.Npm.FunctionalTests.npmproj @@ -26,9 +26,6 @@ - - - + { + if (context.Request.Path.StartsWithSegments("/redirect")) + { + var newUrl = context.Request.Query["baseUrl"] + "/testHub?numRedirects=" + Interlocked.Increment(ref _numRedirects); + return context.Response.WriteAsync($"{{ \"url\": \"{newUrl}\" }}"); + } + + return next(); + }); + app.Use(async (context, next) => { if (context.Request.Path.Value.Contains("/negotiate")) diff --git a/src/SignalR/clients/ts/FunctionalTests/TestHub.cs b/src/SignalR/clients/ts/FunctionalTests/TestHub.cs index fce457491e..980a04653a 100644 --- a/src/SignalR/clients/ts/FunctionalTests/TestHub.cs +++ b/src/SignalR/clients/ts/FunctionalTests/TestHub.cs @@ -34,6 +34,11 @@ namespace FunctionalTests return message; } + public int GetNumRedirects() + { + return int.Parse(Context.GetHttpContext().Request.Query["numRedirects"]); + } + public void ThrowException(string message) { throw new InvalidOperationException(message); diff --git a/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts b/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts index f2d9aee35a..7bd557f4fe 100644 --- a/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts +++ b/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts @@ -11,12 +11,15 @@ import { eachTransport, eachTransportAndProtocol, ENDPOINT_BASE_HTTPS_URL, ENDPO import "./LogBannerReporter"; import { TestLogger } from "./TestLogger"; +import { PromiseSource } from "../../signalr/tests/Utils"; + import * as RX from "rxjs"; const TESTHUBENDPOINT_URL = ENDPOINT_BASE_URL + "/testhub"; const TESTHUBENDPOINT_HTTPS_URL = ENDPOINT_BASE_HTTPS_URL ? (ENDPOINT_BASE_HTTPS_URL + "/testhub") : undefined; const TESTHUB_NOWEBSOCKETS_ENDPOINT_URL = ENDPOINT_BASE_URL + "/testhub-nowebsockets"; +const TESTHUB_REDIRECT_ENDPOINT_URL = ENDPOINT_BASE_URL + "/redirect?numRedirects=0&baseUrl=" + ENDPOINT_BASE_URL; const commonOptions: IHttpConnectionOptions = { logMessageContent: true, @@ -421,16 +424,25 @@ describe("hubConnection", () => { }); }); - it("closed with error if hub cannot be created", (done) => { + it("closed with error or start fails if hub cannot be created", async (done) => { const hubConnection = getConnectionBuilder(transportType, ENDPOINT_BASE_URL + "/uncreatable") .withHubProtocol(protocol) .build(); + const expectedErrorMessage = "Server returned an error on close: Connection closed with an error. InvalidOperationException: Unable to resolve service for type 'System.Object' while attempting to activate 'FunctionalTests.UncreatableHub'."; + + // Either start will fail or onclose will be called. Never both. hubConnection.onclose((error) => { - expect(error!.message).toEqual("Server returned an error on close: Connection closed with an error. InvalidOperationException: Unable to resolve service for type 'System.Object' while attempting to activate 'FunctionalTests.UncreatableHub'."); + expect(error!.message).toEqual(expectedErrorMessage); done(); }); - hubConnection.start(); + + try { + await hubConnection.start(); + } catch (error) { + expect(error!.message).toEqual(expectedErrorMessage); + done(); + } }); it("can handle different types", (done) => { @@ -696,9 +708,136 @@ describe("hubConnection", () => { await hubConnection.stop(); done(); }); + + it("can reconnect", async (done) => { + try { + const reconnectingPromise = new PromiseSource(); + const reconnectedPromise = new PromiseSource(); + const hubConnection = getConnectionBuilder(transportType) + .withAutomaticReconnect() + .build(); + + hubConnection.onreconnecting(() => { + reconnectingPromise.resolve(); + }); + + hubConnection.onreconnected((connectionId?) => { + reconnectedPromise.resolve(connectionId); + }); + + await hubConnection.start(); + + const initialConnectionId = (hubConnection as any).connection.connectionId as string; + + // Induce reconnect + (hubConnection as any).serverTimeout(); + + await reconnectingPromise; + const newConnectionId = await reconnectedPromise; + + expect(newConnectionId).not.toBe(initialConnectionId); + + const response = await hubConnection.invoke("Echo", "test"); + + expect(response).toEqual("test"); + + await hubConnection.stop(); + + done(); + } catch (err) { + fail(err); + done(); + } + }); }); }); + it("can reconnect after negotiate redirect", async (done) => { + try { + const reconnectingPromise = new PromiseSource(); + const reconnectedPromise = new PromiseSource(); + const hubConnection = getConnectionBuilder(undefined, TESTHUB_REDIRECT_ENDPOINT_URL) + .withAutomaticReconnect() + .build(); + + hubConnection.onreconnecting(() => { + reconnectingPromise.resolve(); + }); + + hubConnection.onreconnected((connectionId?) => { + reconnectedPromise.resolve(connectionId); + }); + + await hubConnection.start(); + + const preReconnectRedirects = await hubConnection.invoke("GetNumRedirects"); + + const initialConnectionId = (hubConnection as any).connection.connectionId as string; + + // Induce reconnect + (hubConnection as any).serverTimeout(); + + await reconnectingPromise; + const newConnectionId = await reconnectedPromise; + + expect(newConnectionId).not.toBe(initialConnectionId); + + const postReconnectRedirects = await hubConnection.invoke("GetNumRedirects"); + + expect(postReconnectRedirects).toBeGreaterThan(preReconnectRedirects); + + await hubConnection.stop(); + + done(); + } catch (err) { + fail(err); + done(); + } + }); + + it("can reconnect after skipping negotiation", async (done) => { + try { + const reconnectingPromise = new PromiseSource(); + const reconnectedPromise = new PromiseSource(); + const hubConnection = getConnectionBuilder( + HttpTransportType.WebSockets, + undefined, + { skipNegotiation: true }, + ) + .withAutomaticReconnect() + .build(); + + hubConnection.onreconnecting(() => { + reconnectingPromise.resolve(); + }); + + hubConnection.onreconnected((connectionId?) => { + reconnectedPromise.resolve(connectionId); + }); + + await hubConnection.start(); + + // Induce reconnect + (hubConnection as any).serverTimeout(); + + await reconnectingPromise; + const newConnectionId = await reconnectedPromise; + + expect(newConnectionId).toBeUndefined(); + + const response = await hubConnection.invoke("Echo", "test"); + + expect(response).toEqual("test"); + + await hubConnection.stop(); + + done(); + } catch (err) { + fail(err); + done(); + } + }); + if (typeof EventSource !== "undefined") { it("allows Server-Sent Events when negotiating for JSON protocol", async (done) => { const hubConnection = getConnectionBuilder(undefined, TESTHUB_NOWEBSOCKETS_ENDPOINT_URL) diff --git a/src/SignalR/clients/ts/signalr/src/DefaultReconnectPolicy.ts b/src/SignalR/clients/ts/signalr/src/DefaultReconnectPolicy.ts new file mode 100644 index 0000000000..1a5ae81cde --- /dev/null +++ b/src/SignalR/clients/ts/signalr/src/DefaultReconnectPolicy.ts @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +import { IReconnectPolicy } from "./IReconnectPolicy"; + +// 0, 2, 10, 30 second delays before reconnect attempts. +const DEFAULT_RETRY_DELAYS_IN_MILLISECONDS = [0, 2000, 10000, 30000, null]; + +/** @private */ +export class DefaultReconnectPolicy implements IReconnectPolicy { + private readonly retryDelays: Array; + + constructor(retryDelays?: number[]) { + this.retryDelays = retryDelays !== undefined ? [...retryDelays, null] : DEFAULT_RETRY_DELAYS_IN_MILLISECONDS; + } + + public nextRetryDelayInMilliseconds(previousRetryCount: number): number | null { + return this.retryDelays[previousRetryCount]; + } +} diff --git a/src/SignalR/clients/ts/signalr/src/HttpConnection.ts b/src/SignalR/clients/ts/signalr/src/HttpConnection.ts index 4f995f0c3f..cff6b0172f 100644 --- a/src/SignalR/clients/ts/signalr/src/HttpConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/HttpConnection.ts @@ -14,9 +14,10 @@ import { WebSocketTransport } from "./WebSocketTransport"; /** @private */ const enum ConnectionState { - Connecting, - Connected, - Disconnected, + Connecting = "Connecting ", + Connected = "Connected", + Disconnected = "Disconnected", + Disconnecting = "Disconnecting", } /** @private */ @@ -49,16 +50,22 @@ if (Platform.isNode && typeof require !== "undefined") { /** @private */ export class HttpConnection implements IConnection { private connectionState: ConnectionState; - private baseUrl: string; + // connectionStarted is tracked independently from connectionState, so we can check if the + // connection ever did successfully transition from connecting to connected before disconnecting. + private connectionStarted: boolean; + private readonly baseUrl: string; private readonly httpClient: HttpClient; private readonly logger: ILogger; private readonly options: IHttpConnectionOptions; private transport?: ITransport; - private startPromise?: Promise; + private startInternalPromise?: Promise; + private stopPromise?: Promise; + private stopPromiseResolver!: (value?: PromiseLike) => void; private stopError?: Error; private accessTokenFactory?: () => string | Promise; public readonly features: any = {}; + public connectionId?: string; public onreceive: ((data: string | ArrayBuffer) => void) | null; public onclose: ((e?: Error) => void) | null; @@ -89,14 +96,16 @@ export class HttpConnection implements IConnection { this.httpClient = options.httpClient || new DefaultHttpClient(this.logger); this.connectionState = ConnectionState.Disconnected; + this.connectionStarted = false; this.options = options; + this.onreceive = null; this.onclose = null; } public start(): Promise; public start(transferFormat: TransferFormat): Promise; - public start(transferFormat?: TransferFormat): Promise { + public async start(transferFormat?: TransferFormat): Promise { transferFormat = transferFormat || TransferFormat.Binary; Arg.isIn(transferFormat, TransferFormat, "transferFormat"); @@ -104,13 +113,32 @@ export class HttpConnection implements IConnection { this.logger.log(LogLevel.Debug, `Starting connection with transfer format '${TransferFormat[transferFormat]}'.`); if (this.connectionState !== ConnectionState.Disconnected) { - return Promise.reject(new Error("Cannot start a connection that is not in the 'Disconnected' state.")); + return Promise.reject(new Error("Cannot start an HttpConnection that is not in the 'Disconnected' state.")); } this.connectionState = ConnectionState.Connecting; - this.startPromise = this.startInternal(transferFormat); - return this.startPromise; + this.startInternalPromise = this.startInternal(transferFormat); + await this.startInternalPromise; + + // The TypeScript compiler thinks that connectionState must be Connecting here. The TypeScript compiler is wrong. + if (this.connectionState as any === ConnectionState.Disconnecting) { + // stop() was called and transitioned the client into the Disconnecting state. + const message = "Failed to start the HttpConnection before stop() was called."; + this.logger.log(LogLevel.Error, message); + + // We cannot await stopPromise inside startInternal since stopInternal awaits the startInternalPromise. + await this.stopPromise; + + return Promise.reject(new Error(message)); + } else if (this.connectionState as any !== ConnectionState.Connected) { + // stop() was called and transitioned the client into the Disconnecting state. + const message = "HttpConnection.startInternal completed gracefully but didn't enter the connection into the connected state!"; + this.logger.log(LogLevel.Error, message); + return Promise.reject(new Error(message)); + } + + this.connectionStarted = true; } public send(data: string | ArrayBuffer): Promise { @@ -123,22 +151,55 @@ export class HttpConnection implements IConnection { } public async stop(error?: Error): Promise { - this.connectionState = ConnectionState.Disconnected; + if (this.connectionState === ConnectionState.Disconnected) { + this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnected state.`); + return Promise.resolve(); + } + + if (this.connectionState === ConnectionState.Disconnecting) { + this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`); + return this.stopPromise; + } + + this.connectionState = ConnectionState.Disconnecting; + + this.stopPromise = new Promise((resolve) => { + // Don't complete stop() until stopConnection() completes. + this.stopPromiseResolver = resolve; + }); + + // stopInternal should never throw so just observe it. + await this.stopInternal(error); + await this.stopPromise; + } + + private async stopInternal(error?: Error): Promise { // Set error as soon as possible otherwise there is a race between // the transport closing and providing an error and the error from a close message // We would prefer the close message error. this.stopError = error; try { - await this.startPromise; + await this.startInternalPromise; } catch (e) { - // this exception is returned to the user as a rejected Promise from the start method + // This exception is returned to the user as a rejected Promise from the start method. } // The transport's onclose will trigger stopConnection which will run our onclose event. + // The transport should always be set if currently connected. If it wasn't set, it's likely because + // stop was called during start() and start() failed. if (this.transport) { - await this.transport.stop(); + try { + await this.transport.stop(); + } catch (e) { + this.logger.log(LogLevel.Error, `HttpConnection.transport.stop() threw error '${e}'.`); + this.stopConnection(); + } + this.transport = undefined; + } else { + this.logger.log(LogLevel.Debug, "HttpConnection.transport is undefined in HttpConnection.stop() because start() failed."); + this.stopConnection(); } } @@ -157,7 +218,7 @@ export class HttpConnection implements IConnection { // No fallback or negotiate in this case. await this.transport!.connect(url, transferFormat); } else { - return Promise.reject(new Error("Negotiation can only be skipped when using the WebSocket transport directly.")); + throw new Error("Negotiation can only be skipped when using the WebSocket transport directly."); } } else { let negotiateResponse: INegotiateResponse | null = null; @@ -166,16 +227,16 @@ export class HttpConnection implements IConnection { do { negotiateResponse = await this.getNegotiationResponse(url); // the user tries to stop the connection when it is being started - if (this.connectionState === ConnectionState.Disconnected) { - return; + if (this.connectionState === ConnectionState.Disconnecting || this.connectionState === ConnectionState.Disconnected) { + throw new Error("The connection was stopped during negotiation."); } if (negotiateResponse.error) { - return Promise.reject(new Error(negotiateResponse.error)); + throw new Error(negotiateResponse.error); } if ((negotiateResponse as any).ProtocolVersion) { - return Promise.reject(new Error("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https://aka.ms/signalr-core-differences for details.")); + throw new Error("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https://aka.ms/signalr-core-differences for details."); } if (negotiateResponse.url) { @@ -194,9 +255,11 @@ export class HttpConnection implements IConnection { while (negotiateResponse.url && redirects < MAX_REDIRECTS); if (redirects === MAX_REDIRECTS && negotiateResponse.url) { - return Promise.reject(new Error("Negotiate redirection limit exceeded.")); + throw new Error("Negotiate redirection limit exceeded."); } + this.connectionId = negotiateResponse.connectionId; + await this.createTransport(url, this.options.transport, negotiateResponse, transferFormat); } @@ -207,9 +270,16 @@ export class HttpConnection implements IConnection { this.transport!.onreceive = this.onreceive; this.transport!.onclose = (e) => this.stopConnection(e); - // only change the state if we were connecting to not overwrite - // the state if the connection is already marked as Disconnected - this.changeState(ConnectionState.Connecting, ConnectionState.Connected); + if (this.connectionState === ConnectionState.Connecting) { + // Ensure the connection transitions to the connected state prior to completing this.startInternalPromise. + // start() will handle the case when stop was called and startInternal exits still in the disconnecting state. + this.logger.log(LogLevel.Debug, "The HttpConnection connected successfully."); + this.connectionState = ConnectionState.Connected; + } + + // stop() is waiting on us via this.startInternalPromise so keep this.transport around so it can clean up. + // This is the only case startInternal can exit in neither the connected nor disconnected state because stopConnection() + // will transition to the disconnected state. start() will wait for the transition using the stopPromise. } catch (e) { this.logger.log(LogLevel.Error, "Failed to start the connection: " + e); this.connectionState = ConnectionState.Disconnected; @@ -262,9 +332,6 @@ export class HttpConnection implements IConnection { this.transport = requestedTransport; await this.transport.connect(connectUrl, requestedTransferFormat); - // only change the state if we were connecting to not overwrite - // the state if the connection is already marked as Disconnected - this.changeState(ConnectionState.Connecting, ConnectionState.Connected); return; } @@ -272,7 +339,6 @@ export class HttpConnection implements IConnection { const transports = negotiateResponse.availableTransports || []; for (const endpoint of transports) { try { - this.connectionState = ConnectionState.Connecting; const transport = this.resolveTransport(endpoint, requestedTransport, requestedTransferFormat); if (typeof transport === "number") { this.transport = this.constructTransport(transport); @@ -281,14 +347,18 @@ export class HttpConnection implements IConnection { connectUrl = this.createConnectUrl(url, negotiateResponse.connectionId); } await this.transport!.connect(connectUrl, requestedTransferFormat); - this.changeState(ConnectionState.Connecting, ConnectionState.Connected); return; } } catch (ex) { this.logger.log(LogLevel.Error, `Failed to start the transport '${endpoint.transport}': ${ex}`); - this.connectionState = ConnectionState.Disconnected; negotiateResponse.connectionId = undefined; transportExceptions.push(`${endpoint.transport} failed: ${ex}`); + + if (this.connectionState !== ConnectionState.Connecting) { + const message = "Failed to select transport before stop() was called."; + this.logger.log(LogLevel.Debug, message); + return Promise.reject(new Error(message)); + } } } @@ -349,19 +419,30 @@ export class HttpConnection implements IConnection { return transport && typeof (transport) === "object" && "connect" in transport; } - private changeState(from: ConnectionState, to: ConnectionState): boolean { - if (this.connectionState === from) { - this.connectionState = to; - return true; - } - return false; - } - private stopConnection(error?: Error): void { + this.logger.log(LogLevel.Debug, `HttpConnection.stopConnection(${error}) called while in state ${this.connectionState}.`); + this.transport = undefined; // If we have a stopError, it takes precedence over the error from the transport error = this.stopError || error; + this.stopError = undefined; + + if (this.connectionState === ConnectionState.Disconnected) { + this.logger.log(LogLevel.Debug, `Call to HttpConnection.stopConnection(${error}) was ignored because the connection is already in the disconnected state.`); + return; + } + + if (this.connectionState === ConnectionState.Connecting) { + this.logger.log(LogLevel.Warning, `Call to HttpConnection.stopConnection(${error}) was ignored because the connection hasn't yet left the in the connecting state.`); + return; + } + + if (this.connectionState === ConnectionState.Disconnecting) { + // A call to stop() induced this call to stopConnection and needs to be completed. + // Any stop() awaiters will be scheduled to continue after the onclose callback fires. + this.stopPromiseResolver(); + } if (error) { this.logger.log(LogLevel.Error, `Connection disconnected with error '${error}'.`); @@ -371,8 +452,14 @@ export class HttpConnection implements IConnection { this.connectionState = ConnectionState.Disconnected; - if (this.onclose) { - this.onclose(error); + if (this.onclose && this.connectionStarted) { + this.connectionStarted = false; + + try { + this.onclose(error); + } catch (e) { + this.logger.log(LogLevel.Error, `HttpConnection.onclose(${error}) threw error '${e}'.`); + } } } diff --git a/src/SignalR/clients/ts/signalr/src/HubConnection.ts b/src/SignalR/clients/ts/signalr/src/HubConnection.ts index 1c6847733a..baab33767d 100644 --- a/src/SignalR/clients/ts/signalr/src/HubConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/HubConnection.ts @@ -5,6 +5,7 @@ import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage } import { IConnection } from "./IConnection"; import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol"; import { ILogger, LogLevel } from "./ILogger"; +import { IReconnectPolicy } from "./IReconnectPolicy"; import { IStreamResult } from "./Stream"; import { Subject } from "./Subject"; import { Arg } from "./Utils"; @@ -15,9 +16,15 @@ const DEFAULT_PING_INTERVAL_IN_MS: number = 15 * 1000; /** Describes the current state of the {@link HubConnection} to the server. */ export enum HubConnectionState { /** The hub connection is disconnected. */ - Disconnected, + Disconnected = "Disconnected", + /** The hub connection is connecting. */ + Connecting = "Connecting", /** The hub connection is connected. */ - Connected, + Connected = "Connected", + /** The hub connection is disconnecting. */ + Disconnecting = "Disconnecting", + /** The hub connection is reconnecting. */ + Reconnecting = "Reconnecting", } /** Represents a connection to a SignalR Hub. */ @@ -25,20 +32,33 @@ export class HubConnection { private readonly cachedPingMessage: string | ArrayBuffer; private readonly connection: IConnection; private readonly logger: ILogger; + private readonly reconnectPolicy?: IReconnectPolicy; private protocol: IHubProtocol; private handshakeProtocol: HandshakeProtocol; private callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void }; private methods: { [name: string]: Array<(...args: any[]) => void> }; private invocationId: number; + private closedCallbacks: Array<(error?: Error) => void>; + private reconnectingCallbacks: Array<(error?: Error) => void>; + private reconnectedCallbacks: Array<(connectionId?: string) => void>; + private receivedHandshakeResponse: boolean; private handshakeResolver!: (value?: PromiseLike<{}>) => void; private handshakeRejecter!: (reason?: any) => void; + private stopDuringStartError?: Error; + private connectionState: HubConnectionState; + // connectionStarted is tracked independently from connectionState, so we can check if the + // connection ever did successfully transition from connecting to connected before disconnecting. + private connectionStarted: boolean; + private startPromise?: Promise; + private stopPromise?: Promise; // The type of these a) doesn't matter and b) varies when building in browser and node contexts // Since we're building the WebPack bundle directly from the TypeScript, this matters (previously // we built the bundle from the compiled JavaScript). + private reconnectDelayHandle?: any; private timeoutHandle?: any; private pingServerHandle?: any; @@ -61,11 +81,11 @@ export class HubConnection { // create method that can be used by HubConnectionBuilder. An "internal" constructor would just // be stripped away and the '.d.ts' file would have no constructor, which is interpreted as a // public parameter-less constructor. - public static create(connection: IConnection, logger: ILogger, protocol: IHubProtocol): HubConnection { - return new HubConnection(connection, logger, protocol); + public static create(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IReconnectPolicy): HubConnection { + return new HubConnection(connection, logger, protocol, reconnectPolicy); } - private constructor(connection: IConnection, logger: ILogger, protocol: IHubProtocol) { + private constructor(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IReconnectPolicy) { Arg.isRequired(connection, "connection"); Arg.isRequired(logger, "logger"); Arg.isRequired(protocol, "protocol"); @@ -76,6 +96,7 @@ export class HubConnection { this.logger = logger; this.protocol = protocol; this.connection = connection; + this.reconnectPolicy = reconnectPolicy; this.handshakeProtocol = new HandshakeProtocol(); this.connection.onreceive = (data: any) => this.processIncomingData(data); @@ -84,9 +105,12 @@ export class HubConnection { this.callbacks = {}; this.methods = {}; this.closedCallbacks = []; + this.reconnectingCallbacks = []; + this.reconnectedCallbacks = []; this.invocationId = 0; this.receivedHandshakeResponse = false; this.connectionState = HubConnectionState.Disconnected; + this.connectionStarted = false; this.cachedPingMessage = this.protocol.writeMessage({ type: MessageType.Ping }); } @@ -100,16 +124,36 @@ export class HubConnection { * * @returns {Promise} A Promise that resolves when the connection has been successfully established, or rejects with an error. */ - public async start(): Promise { - const handshakeRequest: HandshakeRequestMessage = { - protocol: this.protocol.name, - version: this.protocol.version, - }; + public start(): Promise { + this.startPromise = this.startWithStateTransitions(); + return this.startPromise; + } + private async startWithStateTransitions(): Promise { + if (this.connectionState !== HubConnectionState.Disconnected) { + return Promise.reject(new Error("Cannot start a HubConnection that is not in the 'Disconnected' state.")); + } + + this.connectionState = HubConnectionState.Connecting; this.logger.log(LogLevel.Debug, "Starting HubConnection."); + try { + await this.startInternal(); + + this.connectionState = HubConnectionState.Connected; + this.connectionStarted = true; + this.logger.log(LogLevel.Debug, "HubConnection connected successfully."); + } catch (e) { + this.connectionState = HubConnectionState.Disconnected; + this.logger.log(LogLevel.Debug, `HubConnection failed to start successfully because of error '${e}'.`); + return Promise.reject(e); + } + } + + private async startInternal() { + this.stopDuringStartError = undefined; this.receivedHandshakeResponse = false; - // Set up the promise before any connection is started otherwise it could race with received messages + // Set up the promise before any connection is (re)started otherwise it could race with received messages const handshakePromise = new Promise((resolve, reject) => { this.handshakeResolver = resolve; this.handshakeRejecter = reject; @@ -117,32 +161,96 @@ export class HubConnection { await this.connection.start(this.protocol.transferFormat); - this.logger.log(LogLevel.Debug, "Sending handshake request."); + try { + const handshakeRequest: HandshakeRequestMessage = { + protocol: this.protocol.name, + version: this.protocol.version, + }; - await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest)); + this.logger.log(LogLevel.Debug, "Sending handshake request."); - this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`); + await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest)); - // defensively cleanup timeout in case we receive a message from the server before we finish start - this.cleanupTimeout(); - this.resetTimeoutPeriod(); - this.resetKeepAliveInterval(); + this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`); - // Wait for the handshake to complete before marking connection as connected - await handshakePromise; - this.connectionState = HubConnectionState.Connected; + // defensively cleanup timeout in case we receive a message from the server before we finish start + this.cleanupTimeout(); + this.resetTimeoutPeriod(); + this.resetKeepAliveInterval(); + + await handshakePromise; + + if (this.stopDuringStartError) { + throw this.stopDuringStartError; + } + } catch (e) { + this.logger.log(LogLevel.Debug, `Hub handshake failed with error '${e}' during start(). Stopping HubConnection.`); + + this.cleanupTimeout(); + this.cleanupPingTimer(); + + // HttpConnection.stop() should not complete until after the onclose callback is invoked. + // This will transition the HubConnection to the disconnected state before HttpConnection.stop() completes. + await this.connection.stop(e); + throw e; + } } /** Stops the connection. * * @returns {Promise} A Promise that resolves when the connection has been successfully terminated, or rejects with an error. */ - public stop(): Promise { + public async stop(): Promise { + // Capture the start promise before the connection might be restarted in an onclose callback. + const startPromise = this.startPromise; + + this.stopPromise = this.stopInternal(); + await this.stopPromise; + + try { + // Awaiting undefined continues immediately + await startPromise; + } catch (e) { + // This exception is returned to the user as a rejected Promise from the start method. + } + } + + private stopInternal(error?: Error): Promise { + if (this.connectionState === HubConnectionState.Disconnected) { + this.logger.log(LogLevel.Debug, `Call to HubConnection.stop(${error}) ignored because it is already in the disconnected state.`); + return Promise.resolve(); + } + + if (this.connectionState === HubConnectionState.Disconnecting) { + this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`); + return this.stopPromise!; + } + + this.connectionState = HubConnectionState.Disconnecting; + this.logger.log(LogLevel.Debug, "Stopping HubConnection."); + if (this.reconnectDelayHandle) { + // We're in a reconnect delay which means the underlying connection is currently already stopped. + // Just clear the handle to stop the reconnect loop (which no one is waiting on thankfully) and + // fire the onclose callbacks. + this.logger.log(LogLevel.Debug, "Connection stopped during reconnect delay. Done reconnecting."); + + clearTimeout(this.reconnectDelayHandle); + this.reconnectDelayHandle = undefined; + + this.completeClose(); + return Promise.resolve(); + } + this.cleanupTimeout(); this.cleanupPingTimer(); - return this.connection.stop(); + this.stopDuringStartError = error || new Error("The connection was stopped before the hub handshake could complete."); + + // HttpConnection.stop() should not complete until after either HttpConnection.start() fails + // or the onclose callback is invoked. The onclose callback will transition the HubConnection + // to the disconnected state if need be before HttpConnection.stop() completes. + return this.connection.stop(error); } /** Invokes a streaming hub method on the server using the specified name and arguments. @@ -348,6 +456,26 @@ export class HubConnection { } } + /** Registers a handler that will be invoked when the connection starts reconnecting. + * + * @param {Function} callback The handler that will be invoked when the connection starts reconnecting. Optionally receives a single argument containing the error that caused the connection to start reconnecting (if any). + */ + public onreconnecting(callback: (error?: Error) => void) { + if (callback) { + this.reconnectingCallbacks.push(callback); + } + } + + /** Registers a handler that will be invoked when the connection successfully reconnects. + * + * @param {Function} callback The handler that will be invoked when the connection successfully reconnects. + */ + public onreconnected(callback: (connectionId?: string) => void) { + if (callback) { + this.reconnectedCallbacks.push(callback); + } + } + private processIncomingData(data: any) { this.cleanupTimeout(); @@ -369,7 +497,7 @@ export class HubConnection { case MessageType.StreamItem: case MessageType.Completion: const callback = this.callbacks[message.invocationId]; - if (callback != null) { + if (callback) { if (message.type === MessageType.Completion) { delete this.callbacks[message.invocationId]; } @@ -383,8 +511,7 @@ export class HubConnection { this.logger.log(LogLevel.Information, "Close message received from server."); // We don't want to wait on the stop itself. - // tslint:disable-next-line:no-floating-promises - this.connection.stop(message.error ? new Error("Server returned an error on close: " + message.error) : undefined); + this.stopPromise = this.stopInternal(message.error ? new Error("Server returned an error on close: " + message.error) : undefined); break; default: @@ -408,10 +535,6 @@ export class HubConnection { this.logger.log(LogLevel.Error, message); const error = new Error(message); - - // We don't want to wait on the stop itself. - // tslint:disable-next-line:no-floating-promises - this.connection.stop(error); this.handshakeRejecter(error); throw error; } @@ -419,11 +542,9 @@ export class HubConnection { const message = "Server returned handshake error: " + responseMessage.error; this.logger.log(LogLevel.Error, message); - this.handshakeRejecter(message); - // We don't want to wait on the stop itself. - // tslint:disable-next-line:no-floating-promises - this.connection.stop(new Error(message)); - throw new Error(message); + const error = new Error(message); + this.handshakeRejecter(error); + throw error; } else { this.logger.log(LogLevel.Debug, "Server handshake complete."); } @@ -456,7 +577,7 @@ export class HubConnection { private serverTimeout() { // The server hasn't talked to us in a while. It doesn't like us anymore ... :( - // Terminate the connection, but we don't need to wait on the promise. + // Terminate the connection, but we don't need to wait on the promise. This could trigger reconnecting. // tslint:disable-next-line:no-floating-promises this.connection.stop(new Error("Server timeout elapsed without receiving a message from the server.")); } @@ -464,15 +585,19 @@ export class HubConnection { private invokeClientMethod(invocationMessage: InvocationMessage) { const methods = this.methods[invocationMessage.target.toLowerCase()]; if (methods) { - methods.forEach((m) => m.apply(this, invocationMessage.arguments)); + try { + methods.forEach((m) => m.apply(this, invocationMessage.arguments)); + } catch (e) { + this.logger.log(LogLevel.Error, `A callback for the method ${invocationMessage.target.toLowerCase()} threw error '${e}'.`); + } + if (invocationMessage.invocationId) { // This is not supported in v1. So we return an error to avoid blocking the server waiting for the response. const message = "Server requested a response, which is not supported in this version of the client."; this.logger.log(LogLevel.Error, message); - // We don't need to wait on this Promise. - // tslint:disable-next-line:no-floating-promises - this.connection.stop(new Error(message)); + // We don't want to wait on the stop itself. + this.stopPromise = this.stopInternal(new Error(message)); } } else { this.logger.log(LogLevel.Warning, `No client method with the name '${invocationMessage.target}' found.`); @@ -480,27 +605,148 @@ export class HubConnection { } private connectionClosed(error?: Error) { - const callbacks = this.callbacks; - this.callbacks = {}; + this.logger.log(LogLevel.Debug, `HubConnection.connectionClosed(${error}) called while in state ${this.connectionState}.`); - this.connectionState = HubConnectionState.Disconnected; + // Triggering this.handshakeRejecter is insufficient because it could already be resolved without the continuation having run yet. + this.stopDuringStartError = this.stopDuringStartError || error || new Error("The underlying connection was closed before the hub handshake could complete."); - // if handshake is in progress start will be waiting for the handshake promise, so we complete it - // if it has already completed this should just noop - if (this.handshakeRejecter) { - this.handshakeRejecter(error); + // If the handshake is in progress, start will be waiting for the handshake promise, so we complete it. + // If it has already completed, this should just noop. + if (this.handshakeResolver) { + this.handshakeResolver(); } - Object.keys(callbacks) - .forEach((key) => { - const callback = callbacks[key]; - callback(null, error ? error : new Error("Invocation canceled due to connection being closed.")); - }); + this.cancelCallbacksWithError(error || new Error("Invocation canceled due to the underlying connection being closed.")); this.cleanupTimeout(); this.cleanupPingTimer(); - this.closedCallbacks.forEach((c) => c.apply(this, [error])); + if (this.connectionState === HubConnectionState.Disconnecting) { + this.completeClose(error); + } else if (this.connectionState === HubConnectionState.Connected && this.reconnectPolicy) { + // tslint:disable-next-line:no-floating-promises + this.reconnect(error); + } else if (this.connectionState === HubConnectionState.Connected) { + this.completeClose(error); + } + + // If none of the above if conditions were true were called the HubConnection must be in either: + // 1. The Connecting state in which case the handshakeResolver will complete it and stopDuringStartError will fail it. + // 2. The Reconnecting state in which case the handshakeResolver will complete it and stopDuringStartError will fail the current reconnect attempt + // and potentially continue the reconnect() loop. + // 3. The Disconnected state in which case we're already done. + } + + private completeClose(error?: Error) { + if (this.connectionStarted) { + this.connectionState = HubConnectionState.Disconnected; + this.connectionStarted = false; + + try { + this.closedCallbacks.forEach((c) => c.apply(this, [error])); + } catch (e) { + this.logger.log(LogLevel.Error, `An onclose callback called with error '${error}' threw error '${e}'.`); + } + } + } + + private async reconnect(error?: Error) { + const reconnectStartTime = Date.now(); + let previousReconnectAttempts = 0; + + let nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, 0); + + if (nextRetryDelay === null) { + this.logger.log(LogLevel.Debug, "Connection not reconnecting because the IReconnectPolicy returned null on the first reconnect attempt."); + this.completeClose(error); + return; + } + + this.connectionState = HubConnectionState.Reconnecting; + + if (error) { + this.logger.log(LogLevel.Information, `Connection reconnecting because of error '${error}'.`); + } else { + this.logger.log(LogLevel.Information, "Connection reconnecting."); + } + + if (this.onreconnecting) { + try { + this.reconnectingCallbacks.forEach((c) => c.apply(this, [error])); + } catch (e) { + this.logger.log(LogLevel.Error, `An onreconnecting callback called with error '${error}' threw error '${e}'.`); + } + + // Exit early if an onreconnecting callback called connection.stop(). + if (this.connectionState !== HubConnectionState.Reconnecting) { + this.logger.log(LogLevel.Debug, "Connection left the reconnecting state in onreconnecting callback. Done reconnecting."); + return; + } + } + + while (nextRetryDelay !== null) { + this.logger.log(LogLevel.Information, `Reconnect attempt number ${previousReconnectAttempts} will start in ${nextRetryDelay} ms.`); + + await new Promise((resolve) => { + this.reconnectDelayHandle = setTimeout(resolve, nextRetryDelay!); + }); + this.reconnectDelayHandle = undefined; + + if (this.connectionState !== HubConnectionState.Reconnecting) { + this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect delay. Done reconnecting."); + return; + } + + try { + await this.startInternal(); + + this.connectionState = HubConnectionState.Connected; + this.logger.log(LogLevel.Information, "HubConnection reconnected successfully."); + + if (this.onreconnected) { + try { + this.reconnectedCallbacks.forEach((c) => c.apply(this, [this.connection.connectionId])); + } catch (e) { + this.logger.log(LogLevel.Error, `An onreconnected callback called with connectionId '${this.connection.connectionId}; threw error '${e}'.`); + } + } + + return; + } catch (e) { + this.logger.log(LogLevel.Information, `Reconnect attempt failed because of error '${e}'.`); + + if (this.connectionState !== HubConnectionState.Reconnecting) { + this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect attempt. Done reconnecting."); + return; + } + } + + nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, Date.now() - reconnectStartTime); + } + + this.logger.log(LogLevel.Information, `Reconnect retries have been exhausted after ${Date.now() - reconnectStartTime} ms and ${previousReconnectAttempts} failed attempts. Connection disconnecting.`); + + this.completeClose(); + } + + private getNextRetryDelay(previousRetryCount: number, elapsedMilliseconds: number) { + try { + return this.reconnectPolicy!.nextRetryDelayInMilliseconds(previousRetryCount, elapsedMilliseconds); + } catch (e) { + this.logger.log(LogLevel.Error, `IReconnectPolicy.nextRetryDelayInMilliseconds(${previousRetryCount}, ${elapsedMilliseconds}) threw error '${e}'.`); + return null; + } + } + + private cancelCallbacksWithError(error: Error) { + const callbacks = this.callbacks; + this.callbacks = {}; + + Object.keys(callbacks) + .forEach((key) => { + const callback = callbacks[key]; + callback(null, error); + }); } private cleanupPingTimer(): void { diff --git a/src/SignalR/clients/ts/signalr/src/HubConnectionBuilder.ts b/src/SignalR/clients/ts/signalr/src/HubConnectionBuilder.ts index 773ab3aa00..53940d9769 100644 --- a/src/SignalR/clients/ts/signalr/src/HubConnectionBuilder.ts +++ b/src/SignalR/clients/ts/signalr/src/HubConnectionBuilder.ts @@ -1,11 +1,13 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +import { DefaultReconnectPolicy } from "./DefaultReconnectPolicy"; import { HttpConnection } from "./HttpConnection"; import { HubConnection } from "./HubConnection"; import { IHttpConnectionOptions } from "./IHttpConnectionOptions"; import { IHubProtocol } from "./IHubProtocol"; import { ILogger, LogLevel } from "./ILogger"; +import { IReconnectPolicy } from "./IReconnectPolicy"; import { HttpTransportType } from "./ITransport"; import { JsonHubProtocol } from "./JsonHubProtocol"; import { NullLogger } from "./Loggers"; @@ -22,6 +24,10 @@ export class HubConnectionBuilder { /** @internal */ public logger?: ILogger; + /** If defined, this indicates the client should automatically attempt to reconnect if the connection is lost. */ + /** @internal */ + public reconnectPolicy?: IReconnectPolicy; + /** Configures console logging for the {@link @aspnet/signalr.HubConnection}. * * @param {LogLevel} logLevel The minimum level of messages to log. Anything at this level, or a more severe level, will be logged. @@ -35,6 +41,7 @@ export class HubConnectionBuilder { * @returns The {@link @aspnet/signalr.HubConnectionBuilder} instance, for chaining. */ public configureLogging(logger: ILogger): HubConnectionBuilder; + /** Configures custom logging for the {@link @aspnet/signalr.HubConnection}. * * @param {LogLevel | ILogger} logging An object implementing the {@link @aspnet/signalr.ILogger} interface or {@link @aspnet/signalr.LogLevel}. @@ -85,9 +92,10 @@ export class HubConnectionBuilder { // Flow-typing knows where it's at. Since HttpTransportType is a number and IHttpConnectionOptions is guaranteed // to be an object, we know (as does TypeScript) this comparison is all we need to figure out which overload was called. if (typeof transportTypeOrOptions === "object") { - this.httpConnectionOptions = transportTypeOrOptions; + this.httpConnectionOptions = {...this.httpConnectionOptions, ...transportTypeOrOptions}; } else { this.httpConnectionOptions = { + ...this.httpConnectionOptions, transport: transportTypeOrOptions, }; } @@ -106,6 +114,39 @@ export class HubConnectionBuilder { return this; } + /** Configures the {@link @aspnet/signalr.HubConnection} to automatically attempt to reconnect if the connection is lost. + * By default, the client will wait 0, 2, 10 and 30 seconds respectively before trying up to 4 reconnect attempts. + */ + public withAutomaticReconnect(): HubConnectionBuilder; + + /** Configures the {@link @aspnet/signalr.HubConnection} to automatically attempt to reconnect if the connection is lost. + * + * @param {number[]} retryDelays An array containing the delays in milliseconds before trying each reconnect attempt. + * The length of the array represents how many failed reconnect attempts it takes before the client will stop attempting to reconnect. + */ + public withAutomaticReconnect(retryDelays: number[]): HubConnectionBuilder; + + /** Configures the {@link @aspnet/signalr.HubConnection} to automatically attempt to reconnect if the connection is lost. + * + * @param {number[]} reconnectPolicy An {@link @aspnet/signalR.IReconnectPolicy} that controls the timing and number of reconnect attempts. + */ + public withAutomaticReconnect(reconnectPolicy: IReconnectPolicy): HubConnectionBuilder; + public withAutomaticReconnect(retryDelaysOrReconnectPolicy?: number[] | IReconnectPolicy): HubConnectionBuilder { + if (this.reconnectPolicy) { + throw new Error("A reconnectPolicy has already been set."); + } + + if (!retryDelaysOrReconnectPolicy) { + this.reconnectPolicy = new DefaultReconnectPolicy(); + } else if (Array.isArray(retryDelaysOrReconnectPolicy)) { + this.reconnectPolicy = new DefaultReconnectPolicy(retryDelaysOrReconnectPolicy); + } else { + this.reconnectPolicy = retryDelaysOrReconnectPolicy; + } + + return this; + } + /** Creates a {@link @aspnet/signalr.HubConnection} from the configuration options specified in this builder. * * @returns {HubConnection} The configured {@link @aspnet/signalr.HubConnection}. @@ -130,7 +171,8 @@ export class HubConnectionBuilder { return HubConnection.create( connection, this.logger || NullLogger.instance, - this.protocol || new JsonHubProtocol()); + this.protocol || new JsonHubProtocol(), + this.reconnectPolicy); } } diff --git a/src/SignalR/clients/ts/signalr/src/IConnection.ts b/src/SignalR/clients/ts/signalr/src/IConnection.ts index 96f27830e9..beccd3cbcb 100644 --- a/src/SignalR/clients/ts/signalr/src/IConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/IConnection.ts @@ -6,6 +6,7 @@ import { TransferFormat } from "./ITransport"; /** @private */ export interface IConnection { readonly features: any; + readonly connectionId?: string; start(transferFormat: TransferFormat): Promise; send(data: string | ArrayBuffer): Promise; diff --git a/src/SignalR/clients/ts/signalr/src/IReconnectPolicy.ts b/src/SignalR/clients/ts/signalr/src/IReconnectPolicy.ts new file mode 100644 index 0000000000..7f316aa2ca --- /dev/null +++ b/src/SignalR/clients/ts/signalr/src/IReconnectPolicy.ts @@ -0,0 +1,15 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +/** An abstraction that controls when the client attempts to reconnect and how many times it does so. */ +export interface IReconnectPolicy { + /** Called after the transport loses the connection. + * + * @param {number} previousRetryCount The number of consecutive failed reconnect attempts so far. + * + * @param {number} elapsedMilliseconds The amount of time in milliseconds spent reconnecting so far. + * + * @returns {number | null} The amount of time in milliseconds to wait before the next reconnect attempt. `null` tells the client to stop retrying and close. + */ + nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number): number | null; +} diff --git a/src/SignalR/clients/ts/signalr/src/index.ts b/src/SignalR/clients/ts/signalr/src/index.ts index e1479e72e5..98cb0106d6 100644 --- a/src/SignalR/clients/ts/signalr/src/index.ts +++ b/src/SignalR/clients/ts/signalr/src/index.ts @@ -21,3 +21,4 @@ export { IStreamSubscriber, IStreamResult, ISubscription } from "./Stream"; export { NullLogger } from "./Loggers"; export { JsonHubProtocol } from "./JsonHubProtocol"; export { Subject } from "./Subject"; +export { IReconnectPolicy } from "./IReconnectPolicy"; diff --git a/src/SignalR/clients/ts/signalr/tests/HttpConnection.test.ts b/src/SignalR/clients/ts/signalr/tests/HttpConnection.test.ts index 0c449f3c74..39e44ef03d 100644 --- a/src/SignalR/clients/ts/signalr/tests/HttpConnection.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HttpConnection.test.ts @@ -65,14 +65,10 @@ describe("HttpConnection", () => { it("cannot start a running connection", async () => { await VerifyLogger.run(async (logger) => { - const negotiating = new PromiseSource(); const options: IHttpConnectionOptions = { ...commonOptions, httpClient: new TestHttpClient() - .on("POST", () => { - negotiating.resolve(); - return defaultNegotiateResponse; - }), + .on("POST", () => defaultNegotiateResponse), logger, transport: { connect() { @@ -95,8 +91,9 @@ describe("HttpConnection", () => { await expect(connection.start(TransferFormat.Text)) .rejects - .toThrow("Cannot start a connection that is not in the 'Disconnected' state."); + .toThrow("Cannot start an HttpConnection that is not in the 'Disconnected' state."); } finally { + (options.transport as ITransport).onclose!(); await connection.stop(); } }); @@ -146,8 +143,11 @@ describe("HttpConnection", () => { const connection = new HttpConnection("http://tempuri.org", options); - await connection.start(TransferFormat.Text); - }); + await expect(connection.start(TransferFormat.Text)) + .rejects + .toThrow("The connection was stopped during negotiation."); + }, + "Failed to start the connection: Error: The connection was stopped during negotiation."); }); it("cannot send with an un-started connection", async () => { @@ -277,6 +277,7 @@ describe("HttpConnection", () => { await startPromise; } finally { + (options.transport as ITransport).onclose!(); await connection.stop(); } }); @@ -306,7 +307,7 @@ describe("HttpConnection", () => { expect(await negotiateUrl).toBe(expectedUrl); - await expect(startPromise).rejects; + await expect(startPromise).rejects.toThrow("We don't care how this turns out"); } finally { await connection.stop(); } @@ -805,7 +806,7 @@ describe("HttpConnection", () => { // Force TypeScript to let us call start incorrectly const connection: any = new HttpConnection("http://tempuri.org", { ...commonOptions, logger }); - expect(() => connection.start(42)).toThrowError("Unknown transferFormat value: 42."); + await expect(connection.start(42)).rejects.toThrow("Unknown transferFormat value: 42."); }); }); diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnection.Reconnect.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnection.Reconnect.test.ts new file mode 100644 index 0000000000..9b1d3b34e7 --- /dev/null +++ b/src/SignalR/clients/ts/signalr/tests/HubConnection.Reconnect.test.ts @@ -0,0 +1,702 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy"; +import { HubConnection, HubConnectionState } from "../src/HubConnection"; +import { JsonHubProtocol } from "../src/JsonHubProtocol"; + +import { VerifyLogger } from "./Common"; +import { TestConnection } from "./TestConnection"; +import { PromiseSource } from "./Utils"; + +describe("auto reconnect", () => { + it("is not enabled by default", async () => { + await VerifyLogger.run(async (logger) => { + const closePromise = new PromiseSource(); + let onreconnectingCalled = false; + + const connection = new TestConnection(); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol()); + + hubConnection.onclose(() => { + closePromise.resolve(); + }); + + hubConnection.onreconnecting(() => { + onreconnectingCalled = true; + }); + + await hubConnection.start(); + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await closePromise; + + expect(onreconnectingCalled).toBe(false); + }); + }); + + it("can be opted into", async () => { + await VerifyLogger.run(async (logger) => { + const reconnectedPromise = new PromiseSource(); + + let nextRetryDelayCalledPromise = new PromiseSource(); + let continueRetryingPromise = new PromiseSource(); + + let lastRetryCount = -1; + let lastElapsedMs = -1; + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + const connection = new TestConnection(); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number) { + lastRetryCount = previousRetryCount; + lastElapsedMs = elapsedMilliseconds; + nextRetryDelayCalledPromise.resolve(); + return 0; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + reconnectedPromise.resolve(); + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + await hubConnection.start(); + + connection.start = () => { + const promise = continueRetryingPromise; + continueRetryingPromise = new PromiseSource(); + return promise; + }; + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + nextRetryDelayCalledPromise = new PromiseSource(); + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + // Make sure the the Promise is "handled" immediately upon rejection or else this test fails. + continueRetryingPromise.catch(() => { }); + continueRetryingPromise.reject(new Error("Reconnect attempt failed")); + await nextRetryDelayCalledPromise; + + expect(lastRetryCount).toBe(1); + expect(lastElapsedMs).toBeGreaterThanOrEqual(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + continueRetryingPromise.resolve(); + await reconnectedPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Connected); + expect(lastRetryCount).toBe(1); + expect(lastElapsedMs).toBeGreaterThanOrEqual(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(0); + + await hubConnection.stop(); + + expect(lastRetryCount).toBe(1); + expect(lastElapsedMs).toBeGreaterThanOrEqual(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(1); + }); + }); + + it("stops if the reconnect policy returns null", async () => { + await VerifyLogger.run(async (logger) => { + const closePromise = new PromiseSource(); + + let nextRetryDelayCalledPromise = new PromiseSource(); + + let lastRetryCount = -1; + let lastElapsedMs = -1; + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + const connection = new TestConnection(); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number) { + lastRetryCount = previousRetryCount; + lastElapsedMs = elapsedMilliseconds; + nextRetryDelayCalledPromise.resolve(); + + return previousRetryCount === 0 ? 0 : null; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + }); + + hubConnection.onclose(() => { + closeCount++; + closePromise.resolve(); + }); + + await hubConnection.start(); + + connection.start = () => { + return Promise.reject("Reconnect attempt failed"); + }; + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + nextRetryDelayCalledPromise = new PromiseSource(); + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await nextRetryDelayCalledPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Disconnected); + expect(lastRetryCount).toBe(1); + expect(lastElapsedMs).toBeGreaterThanOrEqual(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(1); + }); + }); + + it("can reconnect multiple times", async () => { + await VerifyLogger.run(async (logger) => { + let reconnectedPromise = new PromiseSource(); + let nextRetryDelayCalledPromise = new PromiseSource(); + + let lastRetryCount = -1; + let lastElapsedMs = -1; + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + const connection = new TestConnection(); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number) { + lastRetryCount = previousRetryCount; + lastElapsedMs = elapsedMilliseconds; + nextRetryDelayCalledPromise.resolve(); + return 0; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + reconnectedPromise.resolve(); + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + await hubConnection.start(); + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + nextRetryDelayCalledPromise = new PromiseSource(); + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await reconnectedPromise; + reconnectedPromise = new PromiseSource(); + + expect(hubConnection.state).toBe(HubConnectionState.Connected); + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(0); + + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(2); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(0); + + await reconnectedPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Connected); + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(2); + expect(onreconnectedCount).toBe(2); + expect(closeCount).toBe(0); + + await hubConnection.stop(); + + expect(lastRetryCount).toBe(0); + expect(lastElapsedMs).toBe(0); + expect(onreconnectingCount).toBe(2); + expect(onreconnectedCount).toBe(2); + expect(closeCount).toBe(1); + }); + }); + + it("does not transition into the reconnecting state if the first retry delay is null", async () => { + await VerifyLogger.run(async (logger) => { + const closePromise = new PromiseSource(); + + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + const connection = new TestConnection(); + // Note the [] parameter to the DefaultReconnectPolicy. + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy([])); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + }); + + hubConnection.onclose(() => { + closeCount++; + closePromise.resolve(); + }); + + await hubConnection.start(); + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await closePromise; + + expect(hubConnection.state).toBe(HubConnectionState.Disconnected); + expect(onreconnectingCount).toBe(0); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(1); + }); + }); + + it("does not transition into the reconnecting state if the connection is lost during initial handshake", async () => { + await VerifyLogger.run(async (logger) => { + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + // Disable autoHandshake in TestConnection + const connection = new TestConnection(false); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy()); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + const startPromise = hubConnection.start(); + + expect(hubConnection.state).toBe(HubConnectionState.Connecting); + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await expect(startPromise).rejects.toThrow("Connection lost"); + + expect(onreconnectingCount).toBe(0); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + }); + }); + + it("continues reconnecting state if the connection is lost during a reconnecting handshake", async () => { + await VerifyLogger.run(async (logger) => { + const reconnectedPromise = new PromiseSource(); + let nextRetryDelayCalledPromise = new PromiseSource(); + + let lastRetryCount = 0; + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + // Disable autoHandshake in TestConnection + const connection = new TestConnection(false); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds(previousRetryCount: number) { + lastRetryCount = previousRetryCount; + nextRetryDelayCalledPromise.resolve(); + return 0; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + reconnectedPromise.resolve(); + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + const startPromise = hubConnection.start(); + // Manually complete handshake. + connection.receive({}); + await startPromise; + + let replacedStartCalledPromise = new PromiseSource(); + connection.start = () => { + replacedStartCalledPromise.resolve(); + return Promise.resolve(); + }; + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + nextRetryDelayCalledPromise = new PromiseSource(); + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await replacedStartCalledPromise; + replacedStartCalledPromise = new PromiseSource(); + + // Fail underlying connection during reconnect during handshake + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(1); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await replacedStartCalledPromise; + + // Manually complete handshake. + connection.receive({}); + + await reconnectedPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Connected); + expect(lastRetryCount).toBe(1); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(0); + + await hubConnection.stop(); + + expect(lastRetryCount).toBe(1); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(1); + }); + }); + + it("continues reconnecting state if invalid handshake response received", async () => { + await VerifyLogger.run(async (logger) => { + const reconnectedPromise = new PromiseSource(); + let nextRetryDelayCalledPromise = new PromiseSource(); + + let lastRetryCount = 0; + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + // Disable autoHandshake in TestConnection + const connection = new TestConnection(false); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds(previousRetryCount: number) { + lastRetryCount = previousRetryCount; + nextRetryDelayCalledPromise.resolve(); + return 0; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + reconnectedPromise.resolve(); + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + const startPromise = hubConnection.start(); + // Manually complete handshake. + connection.receive({}); + await startPromise; + + let replacedStartCalledPromise = new PromiseSource(); + connection.start = () => { + replacedStartCalledPromise.resolve(); + return Promise.resolve(); + }; + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + nextRetryDelayCalledPromise = new PromiseSource(); + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(0); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await replacedStartCalledPromise; + replacedStartCalledPromise = new PromiseSource(); + + // Manually fail handshake + expect(() => connection.receive({ error: "invalid" })).toThrow("invalid"); + + await nextRetryDelayCalledPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(lastRetryCount).toBe(1); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await replacedStartCalledPromise; + + // Manually complete handshake. + connection.receive({}); + + await reconnectedPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Connected); + expect(lastRetryCount).toBe(1); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(0); + + await hubConnection.stop(); + + expect(lastRetryCount).toBe(1); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(1); + expect(closeCount).toBe(1); + }, + "Server returned handshake error: invalid"); + }); + + it("can be stopped while restarting the underlying connection", async () => { + await VerifyLogger.run(async (logger) => { + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + const connection = new TestConnection(); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy([0])); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + await hubConnection.start(); + + const stopCalledPromise = new PromiseSource(); + let stopPromise: Promise; + + connection.start = () => { + stopCalledPromise.resolve(); + stopPromise = hubConnection.stop(); + return Promise.resolve(); + }; + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await stopCalledPromise; + await stopPromise!; + + expect(hubConnection.state).toBe(HubConnectionState.Disconnected); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(1); + }); + }); + + it("can be stopped during a restart handshake", async () => { + await VerifyLogger.run(async (logger) => { + const closedPromise = new PromiseSource(); + const nextRetryDelayCalledPromise = new PromiseSource(); + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + // Disable autoHandshake in TestConnection + const connection = new TestConnection(false); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds() { + nextRetryDelayCalledPromise.resolve(); + return 0; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + closedPromise.resolve(); + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + const startPromise = hubConnection.start(); + // Manually complete handshake. + connection.receive({}); + await startPromise; + + const replacedSendCalledPromise = new PromiseSource(); + connection.send = () => { + replacedSendCalledPromise.resolve(); + return Promise.resolve(); + }; + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + // Wait for the handshake to actually started. Right now, we're awaiting the 0ms delay. + await replacedSendCalledPromise; + + await hubConnection.stop(); + + expect(hubConnection.state).toBe(HubConnectionState.Disconnected); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(1); + }); + }); + + it("can be stopped during a reconnect delay", async () => { + await VerifyLogger.run(async (logger) => { + const closedPromise = new PromiseSource(); + const nextRetryDelayCalledPromise = new PromiseSource(); + let onreconnectingCount = 0; + let onreconnectedCount = 0; + let closeCount = 0; + + const connection = new TestConnection(); + const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), { + nextRetryDelayInMilliseconds() { + nextRetryDelayCalledPromise.resolve(); + // 60s is hopefully longer than this test could ever take. + return 60 * 1000; + }, + }); + + hubConnection.onreconnecting(() => { + onreconnectingCount++; + }); + + hubConnection.onreconnected(() => { + onreconnectedCount++; + closedPromise.resolve(); + }); + + hubConnection.onclose(() => { + closeCount++; + }); + + await hubConnection.start(); + + // Typically this would be called by the transport + connection.onclose!(new Error("Connection lost")); + + await nextRetryDelayCalledPromise; + + expect(hubConnection.state).toBe(HubConnectionState.Reconnecting); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(0); + + await hubConnection.stop(); + + expect(hubConnection.state).toBe(HubConnectionState.Disconnected); + expect(onreconnectingCount).toBe(1); + expect(onreconnectedCount).toBe(0); + expect(closeCount).toBe(1); + }); + }); +}); diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts index 4f5d24ee9c..257f2e174d 100644 --- a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts @@ -13,6 +13,7 @@ import { Subject } from "../src/Subject"; import { TextMessageFormat } from "../src/TextMessageFormat"; import { VerifyLogger } from "./Common"; +import { TestConnection } from "./TestConnection"; import { delayUntil, PromiseSource, registerUnhandledRejectionHandler } from "./Utils"; function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) { @@ -22,7 +23,6 @@ function createHubConnection(connection: IConnection, logger?: ILogger | null, p registerUnhandledRejectionHandler(); describe("HubConnection", () => { - describe("start", () => { it("sends handshake message", async () => { await VerifyLogger.run(async (logger) => { @@ -462,7 +462,7 @@ describe("HubConnection", () => { const invokePromise = hubConnection.invoke("testMethod"); await hubConnection.stop(); - await expect(invokePromise).rejects.toThrow("Invocation canceled due to connection being closed."); + await expect(invokePromise).rejects.toThrow("Invocation canceled due to the underlying connection being closed."); }); }); @@ -691,9 +691,9 @@ describe("HubConnection", () => { } await expect(startPromise) .rejects - .toBe("Server returned handshake error: Error!"); + .toThrow("Server returned handshake error: Error!"); - expect(closeError!.message).toEqual("Server returned handshake error: Error!"); + expect(closeError).toEqual(undefined); } finally { await hubConnection.stop(); } @@ -962,7 +962,7 @@ describe("HubConnection", () => { .subscribe(observer); await hubConnection.stop(); - await expect(observer.completed).rejects.toThrow("Error: Invocation canceled due to connection being closed."); + await expect(observer.completed).rejects.toThrow("Error: Invocation canceled due to the underlying connection being closed."); } finally { await hubConnection.stop(); } @@ -1095,6 +1095,8 @@ describe("HubConnection", () => { await VerifyLogger.run(async (logger) => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); + await hubConnection.start(); + try { let invocations = 0; hubConnection.onclose((e) => invocations++); @@ -1112,6 +1114,8 @@ describe("HubConnection", () => { await VerifyLogger.run(async (logger) => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); + await hubConnection.start(); + try { let error: Error | undefined; hubConnection.onclose((e) => error = e); @@ -1133,6 +1137,7 @@ describe("HubConnection", () => { hubConnection.onclose(null!); hubConnection.onclose(undefined!); // Typically this would be called by the transport + (hubConnection as any).connectionState = HubConnectionState.Connected; connection.onclose!(); // expect no errors } finally { @@ -1145,6 +1150,8 @@ describe("HubConnection", () => { await VerifyLogger.run(async (logger) => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); + await hubConnection.start(); + try { let state: HubConnectionState | undefined; hubConnection.onclose((e) => state = hubConnection.state); @@ -1235,77 +1242,6 @@ async function pingAndWait(connection: TestConnection): Promise { await delayUntil(50); } -class TestConnection implements IConnection { - public readonly features: any = {}; - - public onreceive: ((data: string | ArrayBuffer) => void) | null; - public onclose: ((error?: Error) => void) | null; - public sentData: any[]; - public lastInvocationId: string | null; - - private autoHandshake: boolean | null; - - constructor(autoHandshake: boolean = true) { - this.onreceive = null; - this.onclose = null; - this.sentData = []; - this.lastInvocationId = null; - this.autoHandshake = autoHandshake; - } - - public start(): Promise { - return Promise.resolve(); - } - - public send(data: any): Promise { - const invocation = TextMessageFormat.parse(data)[0]; - const parsedInvocation = JSON.parse(invocation); - const invocationId = parsedInvocation.invocationId; - if (parsedInvocation.protocol && parsedInvocation.version && this.autoHandshake) { - this.receiveHandshakeResponse(); - } - if (invocationId) { - this.lastInvocationId = invocationId; - } - if (this.sentData) { - this.sentData.push(invocation); - } else { - this.sentData = [invocation]; - } - return Promise.resolve(); - } - - public stop(error?: Error): Promise { - if (this.onclose) { - this.onclose(error); - } - return Promise.resolve(); - } - - public receiveHandshakeResponse(error?: string): void { - this.receive({ error }); - } - - public receive(data: any): void { - const payload = JSON.stringify(data); - this.invokeOnReceive(TextMessageFormat.write(payload)); - } - - public receiveText(data: string) { - this.invokeOnReceive(data); - } - - public receiveBinary(data: ArrayBuffer) { - this.invokeOnReceive(data); - } - - private invokeOnReceive(data: string | ArrayBuffer) { - if (this.onreceive) { - this.onreceive(data); - } - } -} - class TestProtocol implements IHubProtocol { public readonly name: string = "TestProtocol"; public readonly version: number = 1; diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnectionBuilder.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnectionBuilder.test.ts index e33d8c26a3..6ee3fbb0af 100644 --- a/src/SignalR/clients/ts/signalr/tests/HubConnectionBuilder.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HubConnectionBuilder.test.ts @@ -1,8 +1,9 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy"; import { HttpRequest, HttpResponse } from "../src/HttpClient"; -import { HubConnection } from "../src/HubConnection"; +import { HubConnection, HubConnectionState } from "../src/HubConnection"; import { HubConnectionBuilder } from "../src/HubConnectionBuilder"; import { IHttpConnectionOptions } from "../src/IHttpConnectionOptions"; import { HubMessage, IHubProtocol } from "../src/IHubProtocol"; @@ -63,18 +64,10 @@ describe("HubConnectionBuilder", () => { }) .build(); - // Start the connection - const closed = makeClosedPromise(connection); + await expect(connection.start()).rejects.toThrow("The underlying connection was closed before the hub handshake could complete."); + expect(connection.state).toBe(HubConnectionState.Disconnected); - const startPromise = connection.start(); - - const pollRequest = await pollSent.promise; - expect(pollRequest.url).toMatch(/http:\/\/example.com\?id=abc123.*/); - - await closed; - try { - await startPromise; - } catch { } + expect((await pollSent.promise).url).toMatch(/http:\/\/example.com\?id=abc123.*/); }); }); @@ -93,11 +86,11 @@ describe("HubConnectionBuilder", () => { const pollSent = new PromiseSource(); const pollCompleted = new PromiseSource(); - const negotiateReceived = new PromiseSource(); + let negotiateRequest!: HttpRequest; const testClient = createTestClient(pollSent, pollCompleted.promise) .on("POST", "http://example.com?id=abc123", (req) => { // Respond from the poll with the handshake response - negotiateReceived.resolve(req); + negotiateRequest = req; pollCompleted.resolve(new HttpResponse(204, "No Content", "{}")); return new HttpResponse(202); }); @@ -111,18 +104,10 @@ describe("HubConnectionBuilder", () => { .withHubProtocol(protocol) .build(); - // Start the connection - const closed = makeClosedPromise(connection); + await expect(connection.start()).rejects.toThrow("The underlying connection was closed before the hub handshake could complete."); + expect(connection.state).toBe(HubConnectionState.Disconnected); - const startPromise = connection.start(); - - const negotiateRequest = await negotiateReceived.promise; expect(negotiateRequest.content).toBe(`{"protocol":"${protocol.name}","version":1}\x1E`); - - await closed; - try { - await startPromise; - } catch { } }); }); @@ -219,6 +204,54 @@ describe("HubConnectionBuilder", () => { expect(httpConnectionLogger.messages).toContain("Starting connection with transfer format 'Text'."); expect(hubConnectionLogger.messages).not.toContain("Starting connection with transfer format 'Text'."); }); + + it("reconnectPolicy undefined by default", () => { + const builder = new HubConnectionBuilder().withUrl("http://example.com"); + expect(builder.reconnectPolicy).toBeUndefined(); + }); + + it("withAutomaticReconnect throws if reconnectPolicy is already set", () => { + const builder = new HubConnectionBuilder().withAutomaticReconnect(); + expect(() => builder.withAutomaticReconnect()).toThrow("A reconnectPolicy has already been set."); + }); + + it("withAutomaticReconnect uses default retryDelays when called with no arguments", () => { + // From DefaultReconnectPolicy.ts + const DEFAULT_RETRY_DELAYS_IN_MILLISECONDS = [0, 2000, 10000, 30000, null]; + const builder = new HubConnectionBuilder() + .withAutomaticReconnect(); + + let retryCount = 0; + for (const delay of DEFAULT_RETRY_DELAYS_IN_MILLISECONDS) { + expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount++, 0)).toBe(delay); + } + }); + + it("withAutomaticReconnect uses custom retryDelays when provided", () => { + const customRetryDelays = [ 3, 1, 4, 1, 5, 9 ]; + const builder = new HubConnectionBuilder() + .withAutomaticReconnect(customRetryDelays); + + let retryCount = 0; + for (const delay of customRetryDelays) { + expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount++, 0)).toBe(delay); + } + + expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount, 0)).toBe(null); + }); + + it("withAutomaticReconnect uses a custom IReconnectPolicy when provided", () => { + const customRetryDelays = [ 127, 0, 0, 1 ]; + const builder = new HubConnectionBuilder() + .withAutomaticReconnect(new DefaultReconnectPolicy(customRetryDelays)); + + let retryCount = 0; + for (const delay of customRetryDelays) { + expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount++, 0)).toBe(delay); + } + + expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount, 0)).toBe(null); + }); }); class CaptureLogger implements ILogger { @@ -263,18 +296,6 @@ function createTestClient(pollSent: PromiseSource, pollCompleted: P }); } -function makeClosedPromise(connection: HubConnection): Promise { - const closed = new PromiseSource(); - connection.onclose((error) => { - if (error) { - closed.reject(error); - } else { - closed.resolve(); - } - }); - return closed.promise; -} - function eachMissingValue(callback: (val: undefined | null, name: string) => void) { callback(null, "null"); callback(undefined, "undefined"); diff --git a/src/SignalR/clients/ts/signalr/tests/TestConnection.ts b/src/SignalR/clients/ts/signalr/tests/TestConnection.ts new file mode 100644 index 0000000000..60e45cc641 --- /dev/null +++ b/src/SignalR/clients/ts/signalr/tests/TestConnection.ts @@ -0,0 +1,78 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +import { IConnection } from "../src/IConnection"; +import { TextMessageFormat } from "../src/TextMessageFormat"; + +export class TestConnection implements IConnection { + public readonly features: any = {}; + public connectionId?: string; + + public onreceive: ((data: string | ArrayBuffer) => void) | null; + public onclose: ((error?: Error) => void) | null; + + public sentData: any[]; + public lastInvocationId: string | null; + + private autoHandshake: boolean | null; + + constructor(autoHandshake: boolean = true) { + this.onreceive = null; + this.onclose = null; + this.sentData = []; + this.lastInvocationId = null; + this.autoHandshake = autoHandshake; + } + + public start(): Promise { + return Promise.resolve(); + } + + public send(data: any): Promise { + const invocation = TextMessageFormat.parse(data)[0]; + const parsedInvocation = JSON.parse(invocation); + const invocationId = parsedInvocation.invocationId; + if (parsedInvocation.protocol && parsedInvocation.version && this.autoHandshake) { + this.receiveHandshakeResponse(); + } + if (invocationId) { + this.lastInvocationId = invocationId; + } + if (this.sentData) { + this.sentData.push(invocation); + } else { + this.sentData = [invocation]; + } + return Promise.resolve(); + } + + public stop(error?: Error): Promise { + if (this.onclose) { + this.onclose(error); + } + return Promise.resolve(); + } + + public receiveHandshakeResponse(error?: string): void { + this.receive({ error }); + } + + public receive(data: any): void { + const payload = JSON.stringify(data); + this.invokeOnReceive(TextMessageFormat.write(payload)); + } + + public receiveText(data: string) { + this.invokeOnReceive(data); + } + + public receiveBinary(data: ArrayBuffer) { + this.invokeOnReceive(data); + } + + private invokeOnReceive(data: string | ArrayBuffer) { + if (this.onreceive) { + this.onreceive(data); + } + } +} diff --git a/src/SignalR/clients/ts/signalr/tests/TestEventSource.ts b/src/SignalR/clients/ts/signalr/tests/TestEventSource.ts index f0828a544e..d77f350616 100644 --- a/src/SignalR/clients/ts/signalr/tests/TestEventSource.ts +++ b/src/SignalR/clients/ts/signalr/tests/TestEventSource.ts @@ -25,12 +25,18 @@ export class TestEventSource { return this._onopen!; } + public static eventSourceSet: PromiseSource; public static eventSource: TestEventSource; public closed: boolean = false; constructor(url: string, eventSourceInitDict?: EventSourceInit) { this.url = url; + TestEventSource.eventSource = this; + + if (TestEventSource.eventSourceSet) { + TestEventSource.eventSourceSet.resolve(); + } } public close(): void { diff --git a/src/SignalR/clients/ts/signalr/tests/TestWebSocket.ts b/src/SignalR/clients/ts/signalr/tests/TestWebSocket.ts index b643958b4e..1eaf631d5d 100644 --- a/src/SignalR/clients/ts/signalr/tests/TestWebSocket.ts +++ b/src/SignalR/clients/ts/signalr/tests/TestWebSocket.ts @@ -7,13 +7,13 @@ export class TestWebSocket { public binaryType: "blob" | "arraybuffer" = "blob"; public bufferedAmount: number = 0; public extensions: string = ""; - public onclose!: ((this: WebSocket, ev: CloseEvent) => any); public onerror!: ((this: WebSocket, ev: Event) => any); public onmessage!: ((this: WebSocket, ev: MessageEvent) => any); public protocol: string; public readyState: number = 1; public url: string; + public static webSocketSet: PromiseSource; public static webSocket: TestWebSocket; public receivedData: Array<(string | ArrayBuffer | Blob | ArrayBufferView)>; @@ -29,6 +29,18 @@ export class TestWebSocket { return this._onopen!; } + // tslint:disable-next-line:variable-name + private _onclose?: (this: WebSocket, evt: Event) => any; + public closeSet: PromiseSource = new PromiseSource(); + public set onclose(value: (this: WebSocket, evt: Event) => any) { + this._onclose = value; + this.closeSet.resolve(); + } + + public get onclose(): (this: WebSocket, evt: Event) => any { + return this._onclose!; + } + public close(code?: number | undefined, reason?: string | undefined): void { const closeEvent = new TestCloseEvent(); closeEvent.code = code || 1000; @@ -58,8 +70,13 @@ export class TestWebSocket { constructor(url: string, protocols?: string | string[]) { this.url = url; this.protocol = protocols ? (typeof protocols === "string" ? protocols : protocols[0]) : ""; - TestWebSocket.webSocket = this; this.receivedData = []; + + TestWebSocket.webSocket = this; + + if (TestWebSocket.webSocketSet) { + TestWebSocket.webSocketSet.resolve(); + } } public readonly CLOSED: number = 1; diff --git a/src/SignalR/clients/ts/signalr/tests/WebSocketTransport.test.ts b/src/SignalR/clients/ts/signalr/tests/WebSocketTransport.test.ts index 4d727ee8ac..8c610d523a 100644 --- a/src/SignalR/clients/ts/signalr/tests/WebSocketTransport.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/WebSocketTransport.test.ts @@ -53,7 +53,7 @@ describe("WebSocketTransport", () => { connectComplete = true; })(); - await TestWebSocket.webSocket.openSet; + await TestWebSocket.webSocket.closeSet; expect(connectComplete).toBe(false); diff --git a/src/SignalR/samples/SignalRSamples/wwwroot/hubs.html b/src/SignalR/samples/SignalRSamples/wwwroot/hubs.html index 8c7092740e..859cac97bf 100644 --- a/src/SignalR/samples/SignalRSamples/wwwroot/hubs.html +++ b/src/SignalR/samples/SignalRSamples/wwwroot/hubs.html @@ -29,6 +29,9 @@ + + + Enable automatic reconnects @@ -105,6 +108,7 @@ let connectButton = document.getElementById('connect'); let disconnectButton = document.getElementById('disconnect'); + let autoReconnectCheckbox = document.getElementById('autoReconnect'); let broadcastButton = document.getElementById('broadcast'); let broadcastExceptMeButton = document.getElementById('broadcast-exceptme'); let sendToConnectionButton = document.getElementById('connection-send'); @@ -145,11 +149,18 @@ } console.log('http://' + document.location.host + '/' + hubRoute); - connection = new signalR.HubConnectionBuilder() + + var connectionBuilder = new signalR.HubConnectionBuilder() .configureLogging(signalR.LogLevel.Trace) .withUrl(hubRoute, options) - .withHubProtocol(protocol) - .build(); + .withHubProtocol(protocol); + + if (autoReconnectCheckbox.checked) { + connectionBuilder.withAutomaticReconnect(); + } + + connection = connectionBuilder.build(); + connection.on('Send', function (msg) { addLine('message-list', msg); }); @@ -164,6 +175,14 @@ updateButtonState(false); }); + connection.onreconnecting(function (e) { + addLine('message-list', 'Connection reconnecting: ' + e, 'orange'); + }); + + connection.onreconnected(function (e) { + addLine('message-list', 'Connection reconnected!', 'green'); + }); + connection.start() .then(function () { isConnected = true;