Implement auto reconnect for SignalR TypeScript client (#8566)

This commit is contained in:
Stephen Halter 2019-04-08 07:33:20 -07:00 committed by GitHub
parent b93bc433db
commit 74bba27f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1605 additions and 227 deletions

View File

@ -0,0 +1,31 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "attach",
"name": "Node - Attach by Process ID",
"processId": "${command:PickProcess}"
},
{
"type": "node",
"request": "launch",
"name": "Jest - All",
"program": "${workspaceFolder}/common/node_modules/jest/bin/jest",
"cwd": "${workspaceFolder}",
"args": ["--runInBand"],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
},
{
"type": "node",
"request": "launch",
"name": "Jest - Current File",
"program": "${workspaceFolder}/common/node_modules/jest/bin/jest",
"cwd": "${workspaceFolder}",
"args": ["${fileBasename}"],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
}
]
}

View File

@ -26,9 +26,6 @@
<Target Name="RunBrowserTests">
<Message Text="Running JavaScript client Browser tests" Importance="high" />
<Yarn Command="run test:inner -- --no-color --configuration $(Configuration)" WorkingDirectory="$(RepositoryRoot)src/SignalR/clients/ts/FunctionalTests" />
<Message Text="Running JavaScript tests" Importance="high" />
<!-- Skip the "inner" test run when we're running DailyTests -->
<Yarn Command="run test:inner -- --no-color --configuration $(Configuration)"
Condition="'$(DailyTests)' != 'true'"

View File

@ -6,6 +6,7 @@ using System.IdentityModel.Tokens.Jwt;
using System.IO;
using System.Reflection;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Builder;
@ -29,6 +30,8 @@ namespace FunctionalTests
private readonly SymmetricSecurityKey SecurityKey = new SymmetricSecurityKey(Guid.NewGuid().ToByteArray());
private readonly JwtSecurityTokenHandler JwtTokenHandler = new JwtSecurityTokenHandler();
private int _numRedirects;
public void ConfigureServices(IServiceCollection services)
{
services.AddConnections();
@ -126,6 +129,17 @@ namespace FunctionalTests
app.UseRouting();
app.Use((context, next) =>
{
if (context.Request.Path.StartsWithSegments("/redirect"))
{
var newUrl = context.Request.Query["baseUrl"] + "/testHub?numRedirects=" + Interlocked.Increment(ref _numRedirects);
return context.Response.WriteAsync($"{{ \"url\": \"{newUrl}\" }}");
}
return next();
});
app.Use(async (context, next) =>
{
if (context.Request.Path.Value.Contains("/negotiate"))

View File

@ -34,6 +34,11 @@ namespace FunctionalTests
return message;
}
public int GetNumRedirects()
{
return int.Parse(Context.GetHttpContext().Request.Query["numRedirects"]);
}
public void ThrowException(string message)
{
throw new InvalidOperationException(message);

View File

@ -11,12 +11,15 @@ import { eachTransport, eachTransportAndProtocol, ENDPOINT_BASE_HTTPS_URL, ENDPO
import "./LogBannerReporter";
import { TestLogger } from "./TestLogger";
import { PromiseSource } from "../../signalr/tests/Utils";
import * as RX from "rxjs";
const TESTHUBENDPOINT_URL = ENDPOINT_BASE_URL + "/testhub";
const TESTHUBENDPOINT_HTTPS_URL = ENDPOINT_BASE_HTTPS_URL ? (ENDPOINT_BASE_HTTPS_URL + "/testhub") : undefined;
const TESTHUB_NOWEBSOCKETS_ENDPOINT_URL = ENDPOINT_BASE_URL + "/testhub-nowebsockets";
const TESTHUB_REDIRECT_ENDPOINT_URL = ENDPOINT_BASE_URL + "/redirect?numRedirects=0&baseUrl=" + ENDPOINT_BASE_URL;
const commonOptions: IHttpConnectionOptions = {
logMessageContent: true,
@ -421,16 +424,25 @@ describe("hubConnection", () => {
});
});
it("closed with error if hub cannot be created", (done) => {
it("closed with error or start fails if hub cannot be created", async (done) => {
const hubConnection = getConnectionBuilder(transportType, ENDPOINT_BASE_URL + "/uncreatable")
.withHubProtocol(protocol)
.build();
const expectedErrorMessage = "Server returned an error on close: Connection closed with an error. InvalidOperationException: Unable to resolve service for type 'System.Object' while attempting to activate 'FunctionalTests.UncreatableHub'.";
// Either start will fail or onclose will be called. Never both.
hubConnection.onclose((error) => {
expect(error!.message).toEqual("Server returned an error on close: Connection closed with an error. InvalidOperationException: Unable to resolve service for type 'System.Object' while attempting to activate 'FunctionalTests.UncreatableHub'.");
expect(error!.message).toEqual(expectedErrorMessage);
done();
});
hubConnection.start();
try {
await hubConnection.start();
} catch (error) {
expect(error!.message).toEqual(expectedErrorMessage);
done();
}
});
it("can handle different types", (done) => {
@ -696,9 +708,136 @@ describe("hubConnection", () => {
await hubConnection.stop();
done();
});
it("can reconnect", async (done) => {
try {
const reconnectingPromise = new PromiseSource();
const reconnectedPromise = new PromiseSource<string | undefined>();
const hubConnection = getConnectionBuilder(transportType)
.withAutomaticReconnect()
.build();
hubConnection.onreconnecting(() => {
reconnectingPromise.resolve();
});
hubConnection.onreconnected((connectionId?) => {
reconnectedPromise.resolve(connectionId);
});
await hubConnection.start();
const initialConnectionId = (hubConnection as any).connection.connectionId as string;
// Induce reconnect
(hubConnection as any).serverTimeout();
await reconnectingPromise;
const newConnectionId = await reconnectedPromise;
expect(newConnectionId).not.toBe(initialConnectionId);
const response = await hubConnection.invoke("Echo", "test");
expect(response).toEqual("test");
await hubConnection.stop();
done();
} catch (err) {
fail(err);
done();
}
});
});
});
it("can reconnect after negotiate redirect", async (done) => {
try {
const reconnectingPromise = new PromiseSource();
const reconnectedPromise = new PromiseSource<string | undefined>();
const hubConnection = getConnectionBuilder(undefined, TESTHUB_REDIRECT_ENDPOINT_URL)
.withAutomaticReconnect()
.build();
hubConnection.onreconnecting(() => {
reconnectingPromise.resolve();
});
hubConnection.onreconnected((connectionId?) => {
reconnectedPromise.resolve(connectionId);
});
await hubConnection.start();
const preReconnectRedirects = await hubConnection.invoke<number>("GetNumRedirects");
const initialConnectionId = (hubConnection as any).connection.connectionId as string;
// Induce reconnect
(hubConnection as any).serverTimeout();
await reconnectingPromise;
const newConnectionId = await reconnectedPromise;
expect(newConnectionId).not.toBe(initialConnectionId);
const postReconnectRedirects = await hubConnection.invoke<number>("GetNumRedirects");
expect(postReconnectRedirects).toBeGreaterThan(preReconnectRedirects);
await hubConnection.stop();
done();
} catch (err) {
fail(err);
done();
}
});
it("can reconnect after skipping negotiation", async (done) => {
try {
const reconnectingPromise = new PromiseSource();
const reconnectedPromise = new PromiseSource<string | undefined>();
const hubConnection = getConnectionBuilder(
HttpTransportType.WebSockets,
undefined,
{ skipNegotiation: true },
)
.withAutomaticReconnect()
.build();
hubConnection.onreconnecting(() => {
reconnectingPromise.resolve();
});
hubConnection.onreconnected((connectionId?) => {
reconnectedPromise.resolve(connectionId);
});
await hubConnection.start();
// Induce reconnect
(hubConnection as any).serverTimeout();
await reconnectingPromise;
const newConnectionId = await reconnectedPromise;
expect(newConnectionId).toBeUndefined();
const response = await hubConnection.invoke("Echo", "test");
expect(response).toEqual("test");
await hubConnection.stop();
done();
} catch (err) {
fail(err);
done();
}
});
if (typeof EventSource !== "undefined") {
it("allows Server-Sent Events when negotiating for JSON protocol", async (done) => {
const hubConnection = getConnectionBuilder(undefined, TESTHUB_NOWEBSOCKETS_ENDPOINT_URL)

View File

@ -0,0 +1,20 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { IReconnectPolicy } from "./IReconnectPolicy";
// 0, 2, 10, 30 second delays before reconnect attempts.
const DEFAULT_RETRY_DELAYS_IN_MILLISECONDS = [0, 2000, 10000, 30000, null];
/** @private */
export class DefaultReconnectPolicy implements IReconnectPolicy {
private readonly retryDelays: Array<number | null>;
constructor(retryDelays?: number[]) {
this.retryDelays = retryDelays !== undefined ? [...retryDelays, null] : DEFAULT_RETRY_DELAYS_IN_MILLISECONDS;
}
public nextRetryDelayInMilliseconds(previousRetryCount: number): number | null {
return this.retryDelays[previousRetryCount];
}
}

View File

@ -14,9 +14,10 @@ import { WebSocketTransport } from "./WebSocketTransport";
/** @private */
const enum ConnectionState {
Connecting,
Connected,
Disconnected,
Connecting = "Connecting ",
Connected = "Connected",
Disconnected = "Disconnected",
Disconnecting = "Disconnecting",
}
/** @private */
@ -49,16 +50,22 @@ if (Platform.isNode && typeof require !== "undefined") {
/** @private */
export class HttpConnection implements IConnection {
private connectionState: ConnectionState;
private baseUrl: string;
// connectionStarted is tracked independently from connectionState, so we can check if the
// connection ever did successfully transition from connecting to connected before disconnecting.
private connectionStarted: boolean;
private readonly baseUrl: string;
private readonly httpClient: HttpClient;
private readonly logger: ILogger;
private readonly options: IHttpConnectionOptions;
private transport?: ITransport;
private startPromise?: Promise<void>;
private startInternalPromise?: Promise<void>;
private stopPromise?: Promise<void>;
private stopPromiseResolver!: (value?: PromiseLike<void>) => void;
private stopError?: Error;
private accessTokenFactory?: () => string | Promise<string>;
public readonly features: any = {};
public connectionId?: string;
public onreceive: ((data: string | ArrayBuffer) => void) | null;
public onclose: ((e?: Error) => void) | null;
@ -89,14 +96,16 @@ export class HttpConnection implements IConnection {
this.httpClient = options.httpClient || new DefaultHttpClient(this.logger);
this.connectionState = ConnectionState.Disconnected;
this.connectionStarted = false;
this.options = options;
this.onreceive = null;
this.onclose = null;
}
public start(): Promise<void>;
public start(transferFormat: TransferFormat): Promise<void>;
public start(transferFormat?: TransferFormat): Promise<void> {
public async start(transferFormat?: TransferFormat): Promise<void> {
transferFormat = transferFormat || TransferFormat.Binary;
Arg.isIn(transferFormat, TransferFormat, "transferFormat");
@ -104,13 +113,32 @@ export class HttpConnection implements IConnection {
this.logger.log(LogLevel.Debug, `Starting connection with transfer format '${TransferFormat[transferFormat]}'.`);
if (this.connectionState !== ConnectionState.Disconnected) {
return Promise.reject(new Error("Cannot start a connection that is not in the 'Disconnected' state."));
return Promise.reject(new Error("Cannot start an HttpConnection that is not in the 'Disconnected' state."));
}
this.connectionState = ConnectionState.Connecting;
this.startPromise = this.startInternal(transferFormat);
return this.startPromise;
this.startInternalPromise = this.startInternal(transferFormat);
await this.startInternalPromise;
// The TypeScript compiler thinks that connectionState must be Connecting here. The TypeScript compiler is wrong.
if (this.connectionState as any === ConnectionState.Disconnecting) {
// stop() was called and transitioned the client into the Disconnecting state.
const message = "Failed to start the HttpConnection before stop() was called.";
this.logger.log(LogLevel.Error, message);
// We cannot await stopPromise inside startInternal since stopInternal awaits the startInternalPromise.
await this.stopPromise;
return Promise.reject(new Error(message));
} else if (this.connectionState as any !== ConnectionState.Connected) {
// stop() was called and transitioned the client into the Disconnecting state.
const message = "HttpConnection.startInternal completed gracefully but didn't enter the connection into the connected state!";
this.logger.log(LogLevel.Error, message);
return Promise.reject(new Error(message));
}
this.connectionStarted = true;
}
public send(data: string | ArrayBuffer): Promise<void> {
@ -123,22 +151,55 @@ export class HttpConnection implements IConnection {
}
public async stop(error?: Error): Promise<void> {
this.connectionState = ConnectionState.Disconnected;
if (this.connectionState === ConnectionState.Disconnected) {
this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnected state.`);
return Promise.resolve();
}
if (this.connectionState === ConnectionState.Disconnecting) {
this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`);
return this.stopPromise;
}
this.connectionState = ConnectionState.Disconnecting;
this.stopPromise = new Promise((resolve) => {
// Don't complete stop() until stopConnection() completes.
this.stopPromiseResolver = resolve;
});
// stopInternal should never throw so just observe it.
await this.stopInternal(error);
await this.stopPromise;
}
private async stopInternal(error?: Error): Promise<void> {
// Set error as soon as possible otherwise there is a race between
// the transport closing and providing an error and the error from a close message
// We would prefer the close message error.
this.stopError = error;
try {
await this.startPromise;
await this.startInternalPromise;
} catch (e) {
// this exception is returned to the user as a rejected Promise from the start method
// This exception is returned to the user as a rejected Promise from the start method.
}
// The transport's onclose will trigger stopConnection which will run our onclose event.
// The transport should always be set if currently connected. If it wasn't set, it's likely because
// stop was called during start() and start() failed.
if (this.transport) {
await this.transport.stop();
try {
await this.transport.stop();
} catch (e) {
this.logger.log(LogLevel.Error, `HttpConnection.transport.stop() threw error '${e}'.`);
this.stopConnection();
}
this.transport = undefined;
} else {
this.logger.log(LogLevel.Debug, "HttpConnection.transport is undefined in HttpConnection.stop() because start() failed.");
this.stopConnection();
}
}
@ -157,7 +218,7 @@ export class HttpConnection implements IConnection {
// No fallback or negotiate in this case.
await this.transport!.connect(url, transferFormat);
} else {
return Promise.reject(new Error("Negotiation can only be skipped when using the WebSocket transport directly."));
throw new Error("Negotiation can only be skipped when using the WebSocket transport directly.");
}
} else {
let negotiateResponse: INegotiateResponse | null = null;
@ -166,16 +227,16 @@ export class HttpConnection implements IConnection {
do {
negotiateResponse = await this.getNegotiationResponse(url);
// the user tries to stop the connection when it is being started
if (this.connectionState === ConnectionState.Disconnected) {
return;
if (this.connectionState === ConnectionState.Disconnecting || this.connectionState === ConnectionState.Disconnected) {
throw new Error("The connection was stopped during negotiation.");
}
if (negotiateResponse.error) {
return Promise.reject(new Error(negotiateResponse.error));
throw new Error(negotiateResponse.error);
}
if ((negotiateResponse as any).ProtocolVersion) {
return Promise.reject(new Error("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https://aka.ms/signalr-core-differences for details."));
throw new Error("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https://aka.ms/signalr-core-differences for details.");
}
if (negotiateResponse.url) {
@ -194,9 +255,11 @@ export class HttpConnection implements IConnection {
while (negotiateResponse.url && redirects < MAX_REDIRECTS);
if (redirects === MAX_REDIRECTS && negotiateResponse.url) {
return Promise.reject(new Error("Negotiate redirection limit exceeded."));
throw new Error("Negotiate redirection limit exceeded.");
}
this.connectionId = negotiateResponse.connectionId;
await this.createTransport(url, this.options.transport, negotiateResponse, transferFormat);
}
@ -207,9 +270,16 @@ export class HttpConnection implements IConnection {
this.transport!.onreceive = this.onreceive;
this.transport!.onclose = (e) => this.stopConnection(e);
// only change the state if we were connecting to not overwrite
// the state if the connection is already marked as Disconnected
this.changeState(ConnectionState.Connecting, ConnectionState.Connected);
if (this.connectionState === ConnectionState.Connecting) {
// Ensure the connection transitions to the connected state prior to completing this.startInternalPromise.
// start() will handle the case when stop was called and startInternal exits still in the disconnecting state.
this.logger.log(LogLevel.Debug, "The HttpConnection connected successfully.");
this.connectionState = ConnectionState.Connected;
}
// stop() is waiting on us via this.startInternalPromise so keep this.transport around so it can clean up.
// This is the only case startInternal can exit in neither the connected nor disconnected state because stopConnection()
// will transition to the disconnected state. start() will wait for the transition using the stopPromise.
} catch (e) {
this.logger.log(LogLevel.Error, "Failed to start the connection: " + e);
this.connectionState = ConnectionState.Disconnected;
@ -262,9 +332,6 @@ export class HttpConnection implements IConnection {
this.transport = requestedTransport;
await this.transport.connect(connectUrl, requestedTransferFormat);
// only change the state if we were connecting to not overwrite
// the state if the connection is already marked as Disconnected
this.changeState(ConnectionState.Connecting, ConnectionState.Connected);
return;
}
@ -272,7 +339,6 @@ export class HttpConnection implements IConnection {
const transports = negotiateResponse.availableTransports || [];
for (const endpoint of transports) {
try {
this.connectionState = ConnectionState.Connecting;
const transport = this.resolveTransport(endpoint, requestedTransport, requestedTransferFormat);
if (typeof transport === "number") {
this.transport = this.constructTransport(transport);
@ -281,14 +347,18 @@ export class HttpConnection implements IConnection {
connectUrl = this.createConnectUrl(url, negotiateResponse.connectionId);
}
await this.transport!.connect(connectUrl, requestedTransferFormat);
this.changeState(ConnectionState.Connecting, ConnectionState.Connected);
return;
}
} catch (ex) {
this.logger.log(LogLevel.Error, `Failed to start the transport '${endpoint.transport}': ${ex}`);
this.connectionState = ConnectionState.Disconnected;
negotiateResponse.connectionId = undefined;
transportExceptions.push(`${endpoint.transport} failed: ${ex}`);
if (this.connectionState !== ConnectionState.Connecting) {
const message = "Failed to select transport before stop() was called.";
this.logger.log(LogLevel.Debug, message);
return Promise.reject(new Error(message));
}
}
}
@ -349,19 +419,30 @@ export class HttpConnection implements IConnection {
return transport && typeof (transport) === "object" && "connect" in transport;
}
private changeState(from: ConnectionState, to: ConnectionState): boolean {
if (this.connectionState === from) {
this.connectionState = to;
return true;
}
return false;
}
private stopConnection(error?: Error): void {
this.logger.log(LogLevel.Debug, `HttpConnection.stopConnection(${error}) called while in state ${this.connectionState}.`);
this.transport = undefined;
// If we have a stopError, it takes precedence over the error from the transport
error = this.stopError || error;
this.stopError = undefined;
if (this.connectionState === ConnectionState.Disconnected) {
this.logger.log(LogLevel.Debug, `Call to HttpConnection.stopConnection(${error}) was ignored because the connection is already in the disconnected state.`);
return;
}
if (this.connectionState === ConnectionState.Connecting) {
this.logger.log(LogLevel.Warning, `Call to HttpConnection.stopConnection(${error}) was ignored because the connection hasn't yet left the in the connecting state.`);
return;
}
if (this.connectionState === ConnectionState.Disconnecting) {
// A call to stop() induced this call to stopConnection and needs to be completed.
// Any stop() awaiters will be scheduled to continue after the onclose callback fires.
this.stopPromiseResolver();
}
if (error) {
this.logger.log(LogLevel.Error, `Connection disconnected with error '${error}'.`);
@ -371,8 +452,14 @@ export class HttpConnection implements IConnection {
this.connectionState = ConnectionState.Disconnected;
if (this.onclose) {
this.onclose(error);
if (this.onclose && this.connectionStarted) {
this.connectionStarted = false;
try {
this.onclose(error);
} catch (e) {
this.logger.log(LogLevel.Error, `HttpConnection.onclose(${error}) threw error '${e}'.`);
}
}
}

View File

@ -5,6 +5,7 @@ import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage }
import { IConnection } from "./IConnection";
import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
import { ILogger, LogLevel } from "./ILogger";
import { IReconnectPolicy } from "./IReconnectPolicy";
import { IStreamResult } from "./Stream";
import { Subject } from "./Subject";
import { Arg } from "./Utils";
@ -15,9 +16,15 @@ const DEFAULT_PING_INTERVAL_IN_MS: number = 15 * 1000;
/** Describes the current state of the {@link HubConnection} to the server. */
export enum HubConnectionState {
/** The hub connection is disconnected. */
Disconnected,
Disconnected = "Disconnected",
/** The hub connection is connecting. */
Connecting = "Connecting",
/** The hub connection is connected. */
Connected,
Connected = "Connected",
/** The hub connection is disconnecting. */
Disconnecting = "Disconnecting",
/** The hub connection is reconnecting. */
Reconnecting = "Reconnecting",
}
/** Represents a connection to a SignalR Hub. */
@ -25,20 +32,33 @@ export class HubConnection {
private readonly cachedPingMessage: string | ArrayBuffer;
private readonly connection: IConnection;
private readonly logger: ILogger;
private readonly reconnectPolicy?: IReconnectPolicy;
private protocol: IHubProtocol;
private handshakeProtocol: HandshakeProtocol;
private callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void };
private methods: { [name: string]: Array<(...args: any[]) => void> };
private invocationId: number;
private closedCallbacks: Array<(error?: Error) => void>;
private reconnectingCallbacks: Array<(error?: Error) => void>;
private reconnectedCallbacks: Array<(connectionId?: string) => void>;
private receivedHandshakeResponse: boolean;
private handshakeResolver!: (value?: PromiseLike<{}>) => void;
private handshakeRejecter!: (reason?: any) => void;
private stopDuringStartError?: Error;
private connectionState: HubConnectionState;
// connectionStarted is tracked independently from connectionState, so we can check if the
// connection ever did successfully transition from connecting to connected before disconnecting.
private connectionStarted: boolean;
private startPromise?: Promise<void>;
private stopPromise?: Promise<void>;
// The type of these a) doesn't matter and b) varies when building in browser and node contexts
// Since we're building the WebPack bundle directly from the TypeScript, this matters (previously
// we built the bundle from the compiled JavaScript).
private reconnectDelayHandle?: any;
private timeoutHandle?: any;
private pingServerHandle?: any;
@ -61,11 +81,11 @@ export class HubConnection {
// create method that can be used by HubConnectionBuilder. An "internal" constructor would just
// be stripped away and the '.d.ts' file would have no constructor, which is interpreted as a
// public parameter-less constructor.
public static create(connection: IConnection, logger: ILogger, protocol: IHubProtocol): HubConnection {
return new HubConnection(connection, logger, protocol);
public static create(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IReconnectPolicy): HubConnection {
return new HubConnection(connection, logger, protocol, reconnectPolicy);
}
private constructor(connection: IConnection, logger: ILogger, protocol: IHubProtocol) {
private constructor(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IReconnectPolicy) {
Arg.isRequired(connection, "connection");
Arg.isRequired(logger, "logger");
Arg.isRequired(protocol, "protocol");
@ -76,6 +96,7 @@ export class HubConnection {
this.logger = logger;
this.protocol = protocol;
this.connection = connection;
this.reconnectPolicy = reconnectPolicy;
this.handshakeProtocol = new HandshakeProtocol();
this.connection.onreceive = (data: any) => this.processIncomingData(data);
@ -84,9 +105,12 @@ export class HubConnection {
this.callbacks = {};
this.methods = {};
this.closedCallbacks = [];
this.reconnectingCallbacks = [];
this.reconnectedCallbacks = [];
this.invocationId = 0;
this.receivedHandshakeResponse = false;
this.connectionState = HubConnectionState.Disconnected;
this.connectionStarted = false;
this.cachedPingMessage = this.protocol.writeMessage({ type: MessageType.Ping });
}
@ -100,16 +124,36 @@ export class HubConnection {
*
* @returns {Promise<void>} A Promise that resolves when the connection has been successfully established, or rejects with an error.
*/
public async start(): Promise<void> {
const handshakeRequest: HandshakeRequestMessage = {
protocol: this.protocol.name,
version: this.protocol.version,
};
public start(): Promise<void> {
this.startPromise = this.startWithStateTransitions();
return this.startPromise;
}
private async startWithStateTransitions(): Promise<void> {
if (this.connectionState !== HubConnectionState.Disconnected) {
return Promise.reject(new Error("Cannot start a HubConnection that is not in the 'Disconnected' state."));
}
this.connectionState = HubConnectionState.Connecting;
this.logger.log(LogLevel.Debug, "Starting HubConnection.");
try {
await this.startInternal();
this.connectionState = HubConnectionState.Connected;
this.connectionStarted = true;
this.logger.log(LogLevel.Debug, "HubConnection connected successfully.");
} catch (e) {
this.connectionState = HubConnectionState.Disconnected;
this.logger.log(LogLevel.Debug, `HubConnection failed to start successfully because of error '${e}'.`);
return Promise.reject(e);
}
}
private async startInternal() {
this.stopDuringStartError = undefined;
this.receivedHandshakeResponse = false;
// Set up the promise before any connection is started otherwise it could race with received messages
// Set up the promise before any connection is (re)started otherwise it could race with received messages
const handshakePromise = new Promise((resolve, reject) => {
this.handshakeResolver = resolve;
this.handshakeRejecter = reject;
@ -117,32 +161,96 @@ export class HubConnection {
await this.connection.start(this.protocol.transferFormat);
this.logger.log(LogLevel.Debug, "Sending handshake request.");
try {
const handshakeRequest: HandshakeRequestMessage = {
protocol: this.protocol.name,
version: this.protocol.version,
};
await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest));
this.logger.log(LogLevel.Debug, "Sending handshake request.");
this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`);
await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest));
// defensively cleanup timeout in case we receive a message from the server before we finish start
this.cleanupTimeout();
this.resetTimeoutPeriod();
this.resetKeepAliveInterval();
this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`);
// Wait for the handshake to complete before marking connection as connected
await handshakePromise;
this.connectionState = HubConnectionState.Connected;
// defensively cleanup timeout in case we receive a message from the server before we finish start
this.cleanupTimeout();
this.resetTimeoutPeriod();
this.resetKeepAliveInterval();
await handshakePromise;
if (this.stopDuringStartError) {
throw this.stopDuringStartError;
}
} catch (e) {
this.logger.log(LogLevel.Debug, `Hub handshake failed with error '${e}' during start(). Stopping HubConnection.`);
this.cleanupTimeout();
this.cleanupPingTimer();
// HttpConnection.stop() should not complete until after the onclose callback is invoked.
// This will transition the HubConnection to the disconnected state before HttpConnection.stop() completes.
await this.connection.stop(e);
throw e;
}
}
/** Stops the connection.
*
* @returns {Promise<void>} A Promise that resolves when the connection has been successfully terminated, or rejects with an error.
*/
public stop(): Promise<void> {
public async stop(): Promise<void> {
// Capture the start promise before the connection might be restarted in an onclose callback.
const startPromise = this.startPromise;
this.stopPromise = this.stopInternal();
await this.stopPromise;
try {
// Awaiting undefined continues immediately
await startPromise;
} catch (e) {
// This exception is returned to the user as a rejected Promise from the start method.
}
}
private stopInternal(error?: Error): Promise<void> {
if (this.connectionState === HubConnectionState.Disconnected) {
this.logger.log(LogLevel.Debug, `Call to HubConnection.stop(${error}) ignored because it is already in the disconnected state.`);
return Promise.resolve();
}
if (this.connectionState === HubConnectionState.Disconnecting) {
this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`);
return this.stopPromise!;
}
this.connectionState = HubConnectionState.Disconnecting;
this.logger.log(LogLevel.Debug, "Stopping HubConnection.");
if (this.reconnectDelayHandle) {
// We're in a reconnect delay which means the underlying connection is currently already stopped.
// Just clear the handle to stop the reconnect loop (which no one is waiting on thankfully) and
// fire the onclose callbacks.
this.logger.log(LogLevel.Debug, "Connection stopped during reconnect delay. Done reconnecting.");
clearTimeout(this.reconnectDelayHandle);
this.reconnectDelayHandle = undefined;
this.completeClose();
return Promise.resolve();
}
this.cleanupTimeout();
this.cleanupPingTimer();
return this.connection.stop();
this.stopDuringStartError = error || new Error("The connection was stopped before the hub handshake could complete.");
// HttpConnection.stop() should not complete until after either HttpConnection.start() fails
// or the onclose callback is invoked. The onclose callback will transition the HubConnection
// to the disconnected state if need be before HttpConnection.stop() completes.
return this.connection.stop(error);
}
/** Invokes a streaming hub method on the server using the specified name and arguments.
@ -348,6 +456,26 @@ export class HubConnection {
}
}
/** Registers a handler that will be invoked when the connection starts reconnecting.
*
* @param {Function} callback The handler that will be invoked when the connection starts reconnecting. Optionally receives a single argument containing the error that caused the connection to start reconnecting (if any).
*/
public onreconnecting(callback: (error?: Error) => void) {
if (callback) {
this.reconnectingCallbacks.push(callback);
}
}
/** Registers a handler that will be invoked when the connection successfully reconnects.
*
* @param {Function} callback The handler that will be invoked when the connection successfully reconnects.
*/
public onreconnected(callback: (connectionId?: string) => void) {
if (callback) {
this.reconnectedCallbacks.push(callback);
}
}
private processIncomingData(data: any) {
this.cleanupTimeout();
@ -369,7 +497,7 @@ export class HubConnection {
case MessageType.StreamItem:
case MessageType.Completion:
const callback = this.callbacks[message.invocationId];
if (callback != null) {
if (callback) {
if (message.type === MessageType.Completion) {
delete this.callbacks[message.invocationId];
}
@ -383,8 +511,7 @@ export class HubConnection {
this.logger.log(LogLevel.Information, "Close message received from server.");
// We don't want to wait on the stop itself.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(message.error ? new Error("Server returned an error on close: " + message.error) : undefined);
this.stopPromise = this.stopInternal(message.error ? new Error("Server returned an error on close: " + message.error) : undefined);
break;
default:
@ -408,10 +535,6 @@ export class HubConnection {
this.logger.log(LogLevel.Error, message);
const error = new Error(message);
// We don't want to wait on the stop itself.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(error);
this.handshakeRejecter(error);
throw error;
}
@ -419,11 +542,9 @@ export class HubConnection {
const message = "Server returned handshake error: " + responseMessage.error;
this.logger.log(LogLevel.Error, message);
this.handshakeRejecter(message);
// We don't want to wait on the stop itself.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(new Error(message));
throw new Error(message);
const error = new Error(message);
this.handshakeRejecter(error);
throw error;
} else {
this.logger.log(LogLevel.Debug, "Server handshake complete.");
}
@ -456,7 +577,7 @@ export class HubConnection {
private serverTimeout() {
// The server hasn't talked to us in a while. It doesn't like us anymore ... :(
// Terminate the connection, but we don't need to wait on the promise.
// Terminate the connection, but we don't need to wait on the promise. This could trigger reconnecting.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(new Error("Server timeout elapsed without receiving a message from the server."));
}
@ -464,15 +585,19 @@ export class HubConnection {
private invokeClientMethod(invocationMessage: InvocationMessage) {
const methods = this.methods[invocationMessage.target.toLowerCase()];
if (methods) {
methods.forEach((m) => m.apply(this, invocationMessage.arguments));
try {
methods.forEach((m) => m.apply(this, invocationMessage.arguments));
} catch (e) {
this.logger.log(LogLevel.Error, `A callback for the method ${invocationMessage.target.toLowerCase()} threw error '${e}'.`);
}
if (invocationMessage.invocationId) {
// This is not supported in v1. So we return an error to avoid blocking the server waiting for the response.
const message = "Server requested a response, which is not supported in this version of the client.";
this.logger.log(LogLevel.Error, message);
// We don't need to wait on this Promise.
// tslint:disable-next-line:no-floating-promises
this.connection.stop(new Error(message));
// We don't want to wait on the stop itself.
this.stopPromise = this.stopInternal(new Error(message));
}
} else {
this.logger.log(LogLevel.Warning, `No client method with the name '${invocationMessage.target}' found.`);
@ -480,27 +605,148 @@ export class HubConnection {
}
private connectionClosed(error?: Error) {
const callbacks = this.callbacks;
this.callbacks = {};
this.logger.log(LogLevel.Debug, `HubConnection.connectionClosed(${error}) called while in state ${this.connectionState}.`);
this.connectionState = HubConnectionState.Disconnected;
// Triggering this.handshakeRejecter is insufficient because it could already be resolved without the continuation having run yet.
this.stopDuringStartError = this.stopDuringStartError || error || new Error("The underlying connection was closed before the hub handshake could complete.");
// if handshake is in progress start will be waiting for the handshake promise, so we complete it
// if it has already completed this should just noop
if (this.handshakeRejecter) {
this.handshakeRejecter(error);
// If the handshake is in progress, start will be waiting for the handshake promise, so we complete it.
// If it has already completed, this should just noop.
if (this.handshakeResolver) {
this.handshakeResolver();
}
Object.keys(callbacks)
.forEach((key) => {
const callback = callbacks[key];
callback(null, error ? error : new Error("Invocation canceled due to connection being closed."));
});
this.cancelCallbacksWithError(error || new Error("Invocation canceled due to the underlying connection being closed."));
this.cleanupTimeout();
this.cleanupPingTimer();
this.closedCallbacks.forEach((c) => c.apply(this, [error]));
if (this.connectionState === HubConnectionState.Disconnecting) {
this.completeClose(error);
} else if (this.connectionState === HubConnectionState.Connected && this.reconnectPolicy) {
// tslint:disable-next-line:no-floating-promises
this.reconnect(error);
} else if (this.connectionState === HubConnectionState.Connected) {
this.completeClose(error);
}
// If none of the above if conditions were true were called the HubConnection must be in either:
// 1. The Connecting state in which case the handshakeResolver will complete it and stopDuringStartError will fail it.
// 2. The Reconnecting state in which case the handshakeResolver will complete it and stopDuringStartError will fail the current reconnect attempt
// and potentially continue the reconnect() loop.
// 3. The Disconnected state in which case we're already done.
}
private completeClose(error?: Error) {
if (this.connectionStarted) {
this.connectionState = HubConnectionState.Disconnected;
this.connectionStarted = false;
try {
this.closedCallbacks.forEach((c) => c.apply(this, [error]));
} catch (e) {
this.logger.log(LogLevel.Error, `An onclose callback called with error '${error}' threw error '${e}'.`);
}
}
}
private async reconnect(error?: Error) {
const reconnectStartTime = Date.now();
let previousReconnectAttempts = 0;
let nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, 0);
if (nextRetryDelay === null) {
this.logger.log(LogLevel.Debug, "Connection not reconnecting because the IReconnectPolicy returned null on the first reconnect attempt.");
this.completeClose(error);
return;
}
this.connectionState = HubConnectionState.Reconnecting;
if (error) {
this.logger.log(LogLevel.Information, `Connection reconnecting because of error '${error}'.`);
} else {
this.logger.log(LogLevel.Information, "Connection reconnecting.");
}
if (this.onreconnecting) {
try {
this.reconnectingCallbacks.forEach((c) => c.apply(this, [error]));
} catch (e) {
this.logger.log(LogLevel.Error, `An onreconnecting callback called with error '${error}' threw error '${e}'.`);
}
// Exit early if an onreconnecting callback called connection.stop().
if (this.connectionState !== HubConnectionState.Reconnecting) {
this.logger.log(LogLevel.Debug, "Connection left the reconnecting state in onreconnecting callback. Done reconnecting.");
return;
}
}
while (nextRetryDelay !== null) {
this.logger.log(LogLevel.Information, `Reconnect attempt number ${previousReconnectAttempts} will start in ${nextRetryDelay} ms.`);
await new Promise((resolve) => {
this.reconnectDelayHandle = setTimeout(resolve, nextRetryDelay!);
});
this.reconnectDelayHandle = undefined;
if (this.connectionState !== HubConnectionState.Reconnecting) {
this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect delay. Done reconnecting.");
return;
}
try {
await this.startInternal();
this.connectionState = HubConnectionState.Connected;
this.logger.log(LogLevel.Information, "HubConnection reconnected successfully.");
if (this.onreconnected) {
try {
this.reconnectedCallbacks.forEach((c) => c.apply(this, [this.connection.connectionId]));
} catch (e) {
this.logger.log(LogLevel.Error, `An onreconnected callback called with connectionId '${this.connection.connectionId}; threw error '${e}'.`);
}
}
return;
} catch (e) {
this.logger.log(LogLevel.Information, `Reconnect attempt failed because of error '${e}'.`);
if (this.connectionState !== HubConnectionState.Reconnecting) {
this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect attempt. Done reconnecting.");
return;
}
}
nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, Date.now() - reconnectStartTime);
}
this.logger.log(LogLevel.Information, `Reconnect retries have been exhausted after ${Date.now() - reconnectStartTime} ms and ${previousReconnectAttempts} failed attempts. Connection disconnecting.`);
this.completeClose();
}
private getNextRetryDelay(previousRetryCount: number, elapsedMilliseconds: number) {
try {
return this.reconnectPolicy!.nextRetryDelayInMilliseconds(previousRetryCount, elapsedMilliseconds);
} catch (e) {
this.logger.log(LogLevel.Error, `IReconnectPolicy.nextRetryDelayInMilliseconds(${previousRetryCount}, ${elapsedMilliseconds}) threw error '${e}'.`);
return null;
}
}
private cancelCallbacksWithError(error: Error) {
const callbacks = this.callbacks;
this.callbacks = {};
Object.keys(callbacks)
.forEach((key) => {
const callback = callbacks[key];
callback(null, error);
});
}
private cleanupPingTimer(): void {

View File

@ -1,11 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { DefaultReconnectPolicy } from "./DefaultReconnectPolicy";
import { HttpConnection } from "./HttpConnection";
import { HubConnection } from "./HubConnection";
import { IHttpConnectionOptions } from "./IHttpConnectionOptions";
import { IHubProtocol } from "./IHubProtocol";
import { ILogger, LogLevel } from "./ILogger";
import { IReconnectPolicy } from "./IReconnectPolicy";
import { HttpTransportType } from "./ITransport";
import { JsonHubProtocol } from "./JsonHubProtocol";
import { NullLogger } from "./Loggers";
@ -22,6 +24,10 @@ export class HubConnectionBuilder {
/** @internal */
public logger?: ILogger;
/** If defined, this indicates the client should automatically attempt to reconnect if the connection is lost. */
/** @internal */
public reconnectPolicy?: IReconnectPolicy;
/** Configures console logging for the {@link @aspnet/signalr.HubConnection}.
*
* @param {LogLevel} logLevel The minimum level of messages to log. Anything at this level, or a more severe level, will be logged.
@ -35,6 +41,7 @@ export class HubConnectionBuilder {
* @returns The {@link @aspnet/signalr.HubConnectionBuilder} instance, for chaining.
*/
public configureLogging(logger: ILogger): HubConnectionBuilder;
/** Configures custom logging for the {@link @aspnet/signalr.HubConnection}.
*
* @param {LogLevel | ILogger} logging An object implementing the {@link @aspnet/signalr.ILogger} interface or {@link @aspnet/signalr.LogLevel}.
@ -85,9 +92,10 @@ export class HubConnectionBuilder {
// Flow-typing knows where it's at. Since HttpTransportType is a number and IHttpConnectionOptions is guaranteed
// to be an object, we know (as does TypeScript) this comparison is all we need to figure out which overload was called.
if (typeof transportTypeOrOptions === "object") {
this.httpConnectionOptions = transportTypeOrOptions;
this.httpConnectionOptions = {...this.httpConnectionOptions, ...transportTypeOrOptions};
} else {
this.httpConnectionOptions = {
...this.httpConnectionOptions,
transport: transportTypeOrOptions,
};
}
@ -106,6 +114,39 @@ export class HubConnectionBuilder {
return this;
}
/** Configures the {@link @aspnet/signalr.HubConnection} to automatically attempt to reconnect if the connection is lost.
* By default, the client will wait 0, 2, 10 and 30 seconds respectively before trying up to 4 reconnect attempts.
*/
public withAutomaticReconnect(): HubConnectionBuilder;
/** Configures the {@link @aspnet/signalr.HubConnection} to automatically attempt to reconnect if the connection is lost.
*
* @param {number[]} retryDelays An array containing the delays in milliseconds before trying each reconnect attempt.
* The length of the array represents how many failed reconnect attempts it takes before the client will stop attempting to reconnect.
*/
public withAutomaticReconnect(retryDelays: number[]): HubConnectionBuilder;
/** Configures the {@link @aspnet/signalr.HubConnection} to automatically attempt to reconnect if the connection is lost.
*
* @param {number[]} reconnectPolicy An {@link @aspnet/signalR.IReconnectPolicy} that controls the timing and number of reconnect attempts.
*/
public withAutomaticReconnect(reconnectPolicy: IReconnectPolicy): HubConnectionBuilder;
public withAutomaticReconnect(retryDelaysOrReconnectPolicy?: number[] | IReconnectPolicy): HubConnectionBuilder {
if (this.reconnectPolicy) {
throw new Error("A reconnectPolicy has already been set.");
}
if (!retryDelaysOrReconnectPolicy) {
this.reconnectPolicy = new DefaultReconnectPolicy();
} else if (Array.isArray(retryDelaysOrReconnectPolicy)) {
this.reconnectPolicy = new DefaultReconnectPolicy(retryDelaysOrReconnectPolicy);
} else {
this.reconnectPolicy = retryDelaysOrReconnectPolicy;
}
return this;
}
/** Creates a {@link @aspnet/signalr.HubConnection} from the configuration options specified in this builder.
*
* @returns {HubConnection} The configured {@link @aspnet/signalr.HubConnection}.
@ -130,7 +171,8 @@ export class HubConnectionBuilder {
return HubConnection.create(
connection,
this.logger || NullLogger.instance,
this.protocol || new JsonHubProtocol());
this.protocol || new JsonHubProtocol(),
this.reconnectPolicy);
}
}

View File

@ -6,6 +6,7 @@ import { TransferFormat } from "./ITransport";
/** @private */
export interface IConnection {
readonly features: any;
readonly connectionId?: string;
start(transferFormat: TransferFormat): Promise<void>;
send(data: string | ArrayBuffer): Promise<void>;

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
/** An abstraction that controls when the client attempts to reconnect and how many times it does so. */
export interface IReconnectPolicy {
/** Called after the transport loses the connection.
*
* @param {number} previousRetryCount The number of consecutive failed reconnect attempts so far.
*
* @param {number} elapsedMilliseconds The amount of time in milliseconds spent reconnecting so far.
*
* @returns {number | null} The amount of time in milliseconds to wait before the next reconnect attempt. `null` tells the client to stop retrying and close.
*/
nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number): number | null;
}

View File

@ -21,3 +21,4 @@ export { IStreamSubscriber, IStreamResult, ISubscription } from "./Stream";
export { NullLogger } from "./Loggers";
export { JsonHubProtocol } from "./JsonHubProtocol";
export { Subject } from "./Subject";
export { IReconnectPolicy } from "./IReconnectPolicy";

View File

@ -65,14 +65,10 @@ describe("HttpConnection", () => {
it("cannot start a running connection", async () => {
await VerifyLogger.run(async (logger) => {
const negotiating = new PromiseSource();
const options: IHttpConnectionOptions = {
...commonOptions,
httpClient: new TestHttpClient()
.on("POST", () => {
negotiating.resolve();
return defaultNegotiateResponse;
}),
.on("POST", () => defaultNegotiateResponse),
logger,
transport: {
connect() {
@ -95,8 +91,9 @@ describe("HttpConnection", () => {
await expect(connection.start(TransferFormat.Text))
.rejects
.toThrow("Cannot start a connection that is not in the 'Disconnected' state.");
.toThrow("Cannot start an HttpConnection that is not in the 'Disconnected' state.");
} finally {
(options.transport as ITransport).onclose!();
await connection.stop();
}
});
@ -146,8 +143,11 @@ describe("HttpConnection", () => {
const connection = new HttpConnection("http://tempuri.org", options);
await connection.start(TransferFormat.Text);
});
await expect(connection.start(TransferFormat.Text))
.rejects
.toThrow("The connection was stopped during negotiation.");
},
"Failed to start the connection: Error: The connection was stopped during negotiation.");
});
it("cannot send with an un-started connection", async () => {
@ -277,6 +277,7 @@ describe("HttpConnection", () => {
await startPromise;
} finally {
(options.transport as ITransport).onclose!();
await connection.stop();
}
});
@ -306,7 +307,7 @@ describe("HttpConnection", () => {
expect(await negotiateUrl).toBe(expectedUrl);
await expect(startPromise).rejects;
await expect(startPromise).rejects.toThrow("We don't care how this turns out");
} finally {
await connection.stop();
}
@ -805,7 +806,7 @@ describe("HttpConnection", () => {
// Force TypeScript to let us call start incorrectly
const connection: any = new HttpConnection("http://tempuri.org", { ...commonOptions, logger });
expect(() => connection.start(42)).toThrowError("Unknown transferFormat value: 42.");
await expect(connection.start(42)).rejects.toThrow("Unknown transferFormat value: 42.");
});
});

View File

@ -0,0 +1,702 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy";
import { HubConnection, HubConnectionState } from "../src/HubConnection";
import { JsonHubProtocol } from "../src/JsonHubProtocol";
import { VerifyLogger } from "./Common";
import { TestConnection } from "./TestConnection";
import { PromiseSource } from "./Utils";
describe("auto reconnect", () => {
it("is not enabled by default", async () => {
await VerifyLogger.run(async (logger) => {
const closePromise = new PromiseSource();
let onreconnectingCalled = false;
const connection = new TestConnection();
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol());
hubConnection.onclose(() => {
closePromise.resolve();
});
hubConnection.onreconnecting(() => {
onreconnectingCalled = true;
});
await hubConnection.start();
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await closePromise;
expect(onreconnectingCalled).toBe(false);
});
});
it("can be opted into", async () => {
await VerifyLogger.run(async (logger) => {
const reconnectedPromise = new PromiseSource();
let nextRetryDelayCalledPromise = new PromiseSource();
let continueRetryingPromise = new PromiseSource();
let lastRetryCount = -1;
let lastElapsedMs = -1;
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
const connection = new TestConnection();
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number) {
lastRetryCount = previousRetryCount;
lastElapsedMs = elapsedMilliseconds;
nextRetryDelayCalledPromise.resolve();
return 0;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
reconnectedPromise.resolve();
});
hubConnection.onclose(() => {
closeCount++;
});
await hubConnection.start();
connection.start = () => {
const promise = continueRetryingPromise;
continueRetryingPromise = new PromiseSource();
return promise;
};
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
nextRetryDelayCalledPromise = new PromiseSource();
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
// Make sure the the Promise is "handled" immediately upon rejection or else this test fails.
continueRetryingPromise.catch(() => { });
continueRetryingPromise.reject(new Error("Reconnect attempt failed"));
await nextRetryDelayCalledPromise;
expect(lastRetryCount).toBe(1);
expect(lastElapsedMs).toBeGreaterThanOrEqual(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
continueRetryingPromise.resolve();
await reconnectedPromise;
expect(hubConnection.state).toBe(HubConnectionState.Connected);
expect(lastRetryCount).toBe(1);
expect(lastElapsedMs).toBeGreaterThanOrEqual(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(0);
await hubConnection.stop();
expect(lastRetryCount).toBe(1);
expect(lastElapsedMs).toBeGreaterThanOrEqual(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(1);
});
});
it("stops if the reconnect policy returns null", async () => {
await VerifyLogger.run(async (logger) => {
const closePromise = new PromiseSource();
let nextRetryDelayCalledPromise = new PromiseSource();
let lastRetryCount = -1;
let lastElapsedMs = -1;
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
const connection = new TestConnection();
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number) {
lastRetryCount = previousRetryCount;
lastElapsedMs = elapsedMilliseconds;
nextRetryDelayCalledPromise.resolve();
return previousRetryCount === 0 ? 0 : null;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
});
hubConnection.onclose(() => {
closeCount++;
closePromise.resolve();
});
await hubConnection.start();
connection.start = () => {
return Promise.reject("Reconnect attempt failed");
};
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
nextRetryDelayCalledPromise = new PromiseSource();
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await nextRetryDelayCalledPromise;
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
expect(lastRetryCount).toBe(1);
expect(lastElapsedMs).toBeGreaterThanOrEqual(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(1);
});
});
it("can reconnect multiple times", async () => {
await VerifyLogger.run(async (logger) => {
let reconnectedPromise = new PromiseSource();
let nextRetryDelayCalledPromise = new PromiseSource();
let lastRetryCount = -1;
let lastElapsedMs = -1;
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
const connection = new TestConnection();
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds(previousRetryCount: number, elapsedMilliseconds: number) {
lastRetryCount = previousRetryCount;
lastElapsedMs = elapsedMilliseconds;
nextRetryDelayCalledPromise.resolve();
return 0;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
reconnectedPromise.resolve();
});
hubConnection.onclose(() => {
closeCount++;
});
await hubConnection.start();
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
nextRetryDelayCalledPromise = new PromiseSource();
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await reconnectedPromise;
reconnectedPromise = new PromiseSource();
expect(hubConnection.state).toBe(HubConnectionState.Connected);
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(0);
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(2);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(0);
await reconnectedPromise;
expect(hubConnection.state).toBe(HubConnectionState.Connected);
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(2);
expect(onreconnectedCount).toBe(2);
expect(closeCount).toBe(0);
await hubConnection.stop();
expect(lastRetryCount).toBe(0);
expect(lastElapsedMs).toBe(0);
expect(onreconnectingCount).toBe(2);
expect(onreconnectedCount).toBe(2);
expect(closeCount).toBe(1);
});
});
it("does not transition into the reconnecting state if the first retry delay is null", async () => {
await VerifyLogger.run(async (logger) => {
const closePromise = new PromiseSource();
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
const connection = new TestConnection();
// Note the [] parameter to the DefaultReconnectPolicy.
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy([]));
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
});
hubConnection.onclose(() => {
closeCount++;
closePromise.resolve();
});
await hubConnection.start();
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await closePromise;
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
expect(onreconnectingCount).toBe(0);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(1);
});
});
it("does not transition into the reconnecting state if the connection is lost during initial handshake", async () => {
await VerifyLogger.run(async (logger) => {
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
// Disable autoHandshake in TestConnection
const connection = new TestConnection(false);
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy());
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
});
hubConnection.onclose(() => {
closeCount++;
});
const startPromise = hubConnection.start();
expect(hubConnection.state).toBe(HubConnectionState.Connecting);
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await expect(startPromise).rejects.toThrow("Connection lost");
expect(onreconnectingCount).toBe(0);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
});
});
it("continues reconnecting state if the connection is lost during a reconnecting handshake", async () => {
await VerifyLogger.run(async (logger) => {
const reconnectedPromise = new PromiseSource();
let nextRetryDelayCalledPromise = new PromiseSource();
let lastRetryCount = 0;
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
// Disable autoHandshake in TestConnection
const connection = new TestConnection(false);
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds(previousRetryCount: number) {
lastRetryCount = previousRetryCount;
nextRetryDelayCalledPromise.resolve();
return 0;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
reconnectedPromise.resolve();
});
hubConnection.onclose(() => {
closeCount++;
});
const startPromise = hubConnection.start();
// Manually complete handshake.
connection.receive({});
await startPromise;
let replacedStartCalledPromise = new PromiseSource();
connection.start = () => {
replacedStartCalledPromise.resolve();
return Promise.resolve();
};
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
nextRetryDelayCalledPromise = new PromiseSource();
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await replacedStartCalledPromise;
replacedStartCalledPromise = new PromiseSource();
// Fail underlying connection during reconnect during handshake
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(1);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await replacedStartCalledPromise;
// Manually complete handshake.
connection.receive({});
await reconnectedPromise;
expect(hubConnection.state).toBe(HubConnectionState.Connected);
expect(lastRetryCount).toBe(1);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(0);
await hubConnection.stop();
expect(lastRetryCount).toBe(1);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(1);
});
});
it("continues reconnecting state if invalid handshake response received", async () => {
await VerifyLogger.run(async (logger) => {
const reconnectedPromise = new PromiseSource();
let nextRetryDelayCalledPromise = new PromiseSource();
let lastRetryCount = 0;
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
// Disable autoHandshake in TestConnection
const connection = new TestConnection(false);
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds(previousRetryCount: number) {
lastRetryCount = previousRetryCount;
nextRetryDelayCalledPromise.resolve();
return 0;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
reconnectedPromise.resolve();
});
hubConnection.onclose(() => {
closeCount++;
});
const startPromise = hubConnection.start();
// Manually complete handshake.
connection.receive({});
await startPromise;
let replacedStartCalledPromise = new PromiseSource();
connection.start = () => {
replacedStartCalledPromise.resolve();
return Promise.resolve();
};
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
nextRetryDelayCalledPromise = new PromiseSource();
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(0);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await replacedStartCalledPromise;
replacedStartCalledPromise = new PromiseSource();
// Manually fail handshake
expect(() => connection.receive({ error: "invalid" })).toThrow("invalid");
await nextRetryDelayCalledPromise;
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(lastRetryCount).toBe(1);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await replacedStartCalledPromise;
// Manually complete handshake.
connection.receive({});
await reconnectedPromise;
expect(hubConnection.state).toBe(HubConnectionState.Connected);
expect(lastRetryCount).toBe(1);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(0);
await hubConnection.stop();
expect(lastRetryCount).toBe(1);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(1);
expect(closeCount).toBe(1);
},
"Server returned handshake error: invalid");
});
it("can be stopped while restarting the underlying connection", async () => {
await VerifyLogger.run(async (logger) => {
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
const connection = new TestConnection();
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy([0]));
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
});
hubConnection.onclose(() => {
closeCount++;
});
await hubConnection.start();
const stopCalledPromise = new PromiseSource();
let stopPromise: Promise<void>;
connection.start = () => {
stopCalledPromise.resolve();
stopPromise = hubConnection.stop();
return Promise.resolve();
};
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await stopCalledPromise;
await stopPromise!;
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(1);
});
});
it("can be stopped during a restart handshake", async () => {
await VerifyLogger.run(async (logger) => {
const closedPromise = new PromiseSource();
const nextRetryDelayCalledPromise = new PromiseSource();
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
// Disable autoHandshake in TestConnection
const connection = new TestConnection(false);
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds() {
nextRetryDelayCalledPromise.resolve();
return 0;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
closedPromise.resolve();
});
hubConnection.onclose(() => {
closeCount++;
});
const startPromise = hubConnection.start();
// Manually complete handshake.
connection.receive({});
await startPromise;
const replacedSendCalledPromise = new PromiseSource();
connection.send = () => {
replacedSendCalledPromise.resolve();
return Promise.resolve();
};
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
// Wait for the handshake to actually started. Right now, we're awaiting the 0ms delay.
await replacedSendCalledPromise;
await hubConnection.stop();
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(1);
});
});
it("can be stopped during a reconnect delay", async () => {
await VerifyLogger.run(async (logger) => {
const closedPromise = new PromiseSource();
const nextRetryDelayCalledPromise = new PromiseSource();
let onreconnectingCount = 0;
let onreconnectedCount = 0;
let closeCount = 0;
const connection = new TestConnection();
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
nextRetryDelayInMilliseconds() {
nextRetryDelayCalledPromise.resolve();
// 60s is hopefully longer than this test could ever take.
return 60 * 1000;
},
});
hubConnection.onreconnecting(() => {
onreconnectingCount++;
});
hubConnection.onreconnected(() => {
onreconnectedCount++;
closedPromise.resolve();
});
hubConnection.onclose(() => {
closeCount++;
});
await hubConnection.start();
// Typically this would be called by the transport
connection.onclose!(new Error("Connection lost"));
await nextRetryDelayCalledPromise;
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(0);
await hubConnection.stop();
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
expect(onreconnectingCount).toBe(1);
expect(onreconnectedCount).toBe(0);
expect(closeCount).toBe(1);
});
});
});

View File

@ -13,6 +13,7 @@ import { Subject } from "../src/Subject";
import { TextMessageFormat } from "../src/TextMessageFormat";
import { VerifyLogger } from "./Common";
import { TestConnection } from "./TestConnection";
import { delayUntil, PromiseSource, registerUnhandledRejectionHandler } from "./Utils";
function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) {
@ -22,7 +23,6 @@ function createHubConnection(connection: IConnection, logger?: ILogger | null, p
registerUnhandledRejectionHandler();
describe("HubConnection", () => {
describe("start", () => {
it("sends handshake message", async () => {
await VerifyLogger.run(async (logger) => {
@ -462,7 +462,7 @@ describe("HubConnection", () => {
const invokePromise = hubConnection.invoke("testMethod");
await hubConnection.stop();
await expect(invokePromise).rejects.toThrow("Invocation canceled due to connection being closed.");
await expect(invokePromise).rejects.toThrow("Invocation canceled due to the underlying connection being closed.");
});
});
@ -691,9 +691,9 @@ describe("HubConnection", () => {
}
await expect(startPromise)
.rejects
.toBe("Server returned handshake error: Error!");
.toThrow("Server returned handshake error: Error!");
expect(closeError!.message).toEqual("Server returned handshake error: Error!");
expect(closeError).toEqual(undefined);
} finally {
await hubConnection.stop();
}
@ -962,7 +962,7 @@ describe("HubConnection", () => {
.subscribe(observer);
await hubConnection.stop();
await expect(observer.completed).rejects.toThrow("Error: Invocation canceled due to connection being closed.");
await expect(observer.completed).rejects.toThrow("Error: Invocation canceled due to the underlying connection being closed.");
} finally {
await hubConnection.stop();
}
@ -1095,6 +1095,8 @@ describe("HubConnection", () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
await hubConnection.start();
try {
let invocations = 0;
hubConnection.onclose((e) => invocations++);
@ -1112,6 +1114,8 @@ describe("HubConnection", () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
await hubConnection.start();
try {
let error: Error | undefined;
hubConnection.onclose((e) => error = e);
@ -1133,6 +1137,7 @@ describe("HubConnection", () => {
hubConnection.onclose(null!);
hubConnection.onclose(undefined!);
// Typically this would be called by the transport
(hubConnection as any).connectionState = HubConnectionState.Connected;
connection.onclose!();
// expect no errors
} finally {
@ -1145,6 +1150,8 @@ describe("HubConnection", () => {
await VerifyLogger.run(async (logger) => {
const connection = new TestConnection();
const hubConnection = createHubConnection(connection, logger);
await hubConnection.start();
try {
let state: HubConnectionState | undefined;
hubConnection.onclose((e) => state = hubConnection.state);
@ -1235,77 +1242,6 @@ async function pingAndWait(connection: TestConnection): Promise<void> {
await delayUntil(50);
}
class TestConnection implements IConnection {
public readonly features: any = {};
public onreceive: ((data: string | ArrayBuffer) => void) | null;
public onclose: ((error?: Error) => void) | null;
public sentData: any[];
public lastInvocationId: string | null;
private autoHandshake: boolean | null;
constructor(autoHandshake: boolean = true) {
this.onreceive = null;
this.onclose = null;
this.sentData = [];
this.lastInvocationId = null;
this.autoHandshake = autoHandshake;
}
public start(): Promise<void> {
return Promise.resolve();
}
public send(data: any): Promise<void> {
const invocation = TextMessageFormat.parse(data)[0];
const parsedInvocation = JSON.parse(invocation);
const invocationId = parsedInvocation.invocationId;
if (parsedInvocation.protocol && parsedInvocation.version && this.autoHandshake) {
this.receiveHandshakeResponse();
}
if (invocationId) {
this.lastInvocationId = invocationId;
}
if (this.sentData) {
this.sentData.push(invocation);
} else {
this.sentData = [invocation];
}
return Promise.resolve();
}
public stop(error?: Error): Promise<void> {
if (this.onclose) {
this.onclose(error);
}
return Promise.resolve();
}
public receiveHandshakeResponse(error?: string): void {
this.receive({ error });
}
public receive(data: any): void {
const payload = JSON.stringify(data);
this.invokeOnReceive(TextMessageFormat.write(payload));
}
public receiveText(data: string) {
this.invokeOnReceive(data);
}
public receiveBinary(data: ArrayBuffer) {
this.invokeOnReceive(data);
}
private invokeOnReceive(data: string | ArrayBuffer) {
if (this.onreceive) {
this.onreceive(data);
}
}
}
class TestProtocol implements IHubProtocol {
public readonly name: string = "TestProtocol";
public readonly version: number = 1;

View File

@ -1,8 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy";
import { HttpRequest, HttpResponse } from "../src/HttpClient";
import { HubConnection } from "../src/HubConnection";
import { HubConnection, HubConnectionState } from "../src/HubConnection";
import { HubConnectionBuilder } from "../src/HubConnectionBuilder";
import { IHttpConnectionOptions } from "../src/IHttpConnectionOptions";
import { HubMessage, IHubProtocol } from "../src/IHubProtocol";
@ -63,18 +64,10 @@ describe("HubConnectionBuilder", () => {
})
.build();
// Start the connection
const closed = makeClosedPromise(connection);
await expect(connection.start()).rejects.toThrow("The underlying connection was closed before the hub handshake could complete.");
expect(connection.state).toBe(HubConnectionState.Disconnected);
const startPromise = connection.start();
const pollRequest = await pollSent.promise;
expect(pollRequest.url).toMatch(/http:\/\/example.com\?id=abc123.*/);
await closed;
try {
await startPromise;
} catch { }
expect((await pollSent.promise).url).toMatch(/http:\/\/example.com\?id=abc123.*/);
});
});
@ -93,11 +86,11 @@ describe("HubConnectionBuilder", () => {
const pollSent = new PromiseSource<HttpRequest>();
const pollCompleted = new PromiseSource<HttpResponse>();
const negotiateReceived = new PromiseSource<HttpRequest>();
let negotiateRequest!: HttpRequest;
const testClient = createTestClient(pollSent, pollCompleted.promise)
.on("POST", "http://example.com?id=abc123", (req) => {
// Respond from the poll with the handshake response
negotiateReceived.resolve(req);
negotiateRequest = req;
pollCompleted.resolve(new HttpResponse(204, "No Content", "{}"));
return new HttpResponse(202);
});
@ -111,18 +104,10 @@ describe("HubConnectionBuilder", () => {
.withHubProtocol(protocol)
.build();
// Start the connection
const closed = makeClosedPromise(connection);
await expect(connection.start()).rejects.toThrow("The underlying connection was closed before the hub handshake could complete.");
expect(connection.state).toBe(HubConnectionState.Disconnected);
const startPromise = connection.start();
const negotiateRequest = await negotiateReceived.promise;
expect(negotiateRequest.content).toBe(`{"protocol":"${protocol.name}","version":1}\x1E`);
await closed;
try {
await startPromise;
} catch { }
});
});
@ -219,6 +204,54 @@ describe("HubConnectionBuilder", () => {
expect(httpConnectionLogger.messages).toContain("Starting connection with transfer format 'Text'.");
expect(hubConnectionLogger.messages).not.toContain("Starting connection with transfer format 'Text'.");
});
it("reconnectPolicy undefined by default", () => {
const builder = new HubConnectionBuilder().withUrl("http://example.com");
expect(builder.reconnectPolicy).toBeUndefined();
});
it("withAutomaticReconnect throws if reconnectPolicy is already set", () => {
const builder = new HubConnectionBuilder().withAutomaticReconnect();
expect(() => builder.withAutomaticReconnect()).toThrow("A reconnectPolicy has already been set.");
});
it("withAutomaticReconnect uses default retryDelays when called with no arguments", () => {
// From DefaultReconnectPolicy.ts
const DEFAULT_RETRY_DELAYS_IN_MILLISECONDS = [0, 2000, 10000, 30000, null];
const builder = new HubConnectionBuilder()
.withAutomaticReconnect();
let retryCount = 0;
for (const delay of DEFAULT_RETRY_DELAYS_IN_MILLISECONDS) {
expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount++, 0)).toBe(delay);
}
});
it("withAutomaticReconnect uses custom retryDelays when provided", () => {
const customRetryDelays = [ 3, 1, 4, 1, 5, 9 ];
const builder = new HubConnectionBuilder()
.withAutomaticReconnect(customRetryDelays);
let retryCount = 0;
for (const delay of customRetryDelays) {
expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount++, 0)).toBe(delay);
}
expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount, 0)).toBe(null);
});
it("withAutomaticReconnect uses a custom IReconnectPolicy when provided", () => {
const customRetryDelays = [ 127, 0, 0, 1 ];
const builder = new HubConnectionBuilder()
.withAutomaticReconnect(new DefaultReconnectPolicy(customRetryDelays));
let retryCount = 0;
for (const delay of customRetryDelays) {
expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount++, 0)).toBe(delay);
}
expect(builder.reconnectPolicy!.nextRetryDelayInMilliseconds(retryCount, 0)).toBe(null);
});
});
class CaptureLogger implements ILogger {
@ -263,18 +296,6 @@ function createTestClient(pollSent: PromiseSource<HttpRequest>, pollCompleted: P
});
}
function makeClosedPromise(connection: HubConnection): Promise<void> {
const closed = new PromiseSource();
connection.onclose((error) => {
if (error) {
closed.reject(error);
} else {
closed.resolve();
}
});
return closed.promise;
}
function eachMissingValue(callback: (val: undefined | null, name: string) => void) {
callback(null, "null");
callback(undefined, "undefined");

View File

@ -0,0 +1,78 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { IConnection } from "../src/IConnection";
import { TextMessageFormat } from "../src/TextMessageFormat";
export class TestConnection implements IConnection {
public readonly features: any = {};
public connectionId?: string;
public onreceive: ((data: string | ArrayBuffer) => void) | null;
public onclose: ((error?: Error) => void) | null;
public sentData: any[];
public lastInvocationId: string | null;
private autoHandshake: boolean | null;
constructor(autoHandshake: boolean = true) {
this.onreceive = null;
this.onclose = null;
this.sentData = [];
this.lastInvocationId = null;
this.autoHandshake = autoHandshake;
}
public start(): Promise<void> {
return Promise.resolve();
}
public send(data: any): Promise<void> {
const invocation = TextMessageFormat.parse(data)[0];
const parsedInvocation = JSON.parse(invocation);
const invocationId = parsedInvocation.invocationId;
if (parsedInvocation.protocol && parsedInvocation.version && this.autoHandshake) {
this.receiveHandshakeResponse();
}
if (invocationId) {
this.lastInvocationId = invocationId;
}
if (this.sentData) {
this.sentData.push(invocation);
} else {
this.sentData = [invocation];
}
return Promise.resolve();
}
public stop(error?: Error): Promise<void> {
if (this.onclose) {
this.onclose(error);
}
return Promise.resolve();
}
public receiveHandshakeResponse(error?: string): void {
this.receive({ error });
}
public receive(data: any): void {
const payload = JSON.stringify(data);
this.invokeOnReceive(TextMessageFormat.write(payload));
}
public receiveText(data: string) {
this.invokeOnReceive(data);
}
public receiveBinary(data: ArrayBuffer) {
this.invokeOnReceive(data);
}
private invokeOnReceive(data: string | ArrayBuffer) {
if (this.onreceive) {
this.onreceive(data);
}
}
}

View File

@ -25,12 +25,18 @@ export class TestEventSource {
return this._onopen!;
}
public static eventSourceSet: PromiseSource;
public static eventSource: TestEventSource;
public closed: boolean = false;
constructor(url: string, eventSourceInitDict?: EventSourceInit) {
this.url = url;
TestEventSource.eventSource = this;
if (TestEventSource.eventSourceSet) {
TestEventSource.eventSourceSet.resolve();
}
}
public close(): void {

View File

@ -7,13 +7,13 @@ export class TestWebSocket {
public binaryType: "blob" | "arraybuffer" = "blob";
public bufferedAmount: number = 0;
public extensions: string = "";
public onclose!: ((this: WebSocket, ev: CloseEvent) => any);
public onerror!: ((this: WebSocket, ev: Event) => any);
public onmessage!: ((this: WebSocket, ev: MessageEvent) => any);
public protocol: string;
public readyState: number = 1;
public url: string;
public static webSocketSet: PromiseSource;
public static webSocket: TestWebSocket;
public receivedData: Array<(string | ArrayBuffer | Blob | ArrayBufferView)>;
@ -29,6 +29,18 @@ export class TestWebSocket {
return this._onopen!;
}
// tslint:disable-next-line:variable-name
private _onclose?: (this: WebSocket, evt: Event) => any;
public closeSet: PromiseSource = new PromiseSource();
public set onclose(value: (this: WebSocket, evt: Event) => any) {
this._onclose = value;
this.closeSet.resolve();
}
public get onclose(): (this: WebSocket, evt: Event) => any {
return this._onclose!;
}
public close(code?: number | undefined, reason?: string | undefined): void {
const closeEvent = new TestCloseEvent();
closeEvent.code = code || 1000;
@ -58,8 +70,13 @@ export class TestWebSocket {
constructor(url: string, protocols?: string | string[]) {
this.url = url;
this.protocol = protocols ? (typeof protocols === "string" ? protocols : protocols[0]) : "";
TestWebSocket.webSocket = this;
this.receivedData = [];
TestWebSocket.webSocket = this;
if (TestWebSocket.webSocketSet) {
TestWebSocket.webSocketSet.resolve();
}
}
public readonly CLOSED: number = 1;

View File

@ -53,7 +53,7 @@ describe("WebSocketTransport", () => {
connectComplete = true;
})();
await TestWebSocket.webSocket.openSet;
await TestWebSocket.webSocket.closeSet;
expect(connectComplete).toBe(false);

View File

@ -29,6 +29,9 @@
<input type="button" id="connect" value="Connect" />
<input type="button" id="disconnect" value="Disconnect" />
<input type="checkbox" id="autoReconnect" checked />
Enable automatic reconnects
</div>
@ -105,6 +108,7 @@
let connectButton = document.getElementById('connect');
let disconnectButton = document.getElementById('disconnect');
let autoReconnectCheckbox = document.getElementById('autoReconnect');
let broadcastButton = document.getElementById('broadcast');
let broadcastExceptMeButton = document.getElementById('broadcast-exceptme');
let sendToConnectionButton = document.getElementById('connection-send');
@ -145,11 +149,18 @@
}
console.log('http://' + document.location.host + '/' + hubRoute);
connection = new signalR.HubConnectionBuilder()
var connectionBuilder = new signalR.HubConnectionBuilder()
.configureLogging(signalR.LogLevel.Trace)
.withUrl(hubRoute, options)
.withHubProtocol(protocol)
.build();
.withHubProtocol(protocol);
if (autoReconnectCheckbox.checked) {
connectionBuilder.withAutomaticReconnect();
}
connection = connectionBuilder.build();
connection.on('Send', function (msg) {
addLine('message-list', msg);
});
@ -164,6 +175,14 @@
updateButtonState(false);
});
connection.onreconnecting(function (e) {
addLine('message-list', 'Connection reconnecting: ' + e, 'orange');
});
connection.onreconnected(function (e) {
addLine('message-list', 'Connection reconnected!', 'green');
});
connection.start()
.then(function () {
isConnected = true;