parent
9facd9b193
commit
00c3e7cd85
|
|
@ -51,7 +51,7 @@ export class HttpConnection implements IConnection {
|
|||
private transport?: ITransport;
|
||||
private startInternalPromise?: Promise<void>;
|
||||
private stopPromise?: Promise<void>;
|
||||
private stopPromiseResolver!: (value?: PromiseLike<void>) => void;
|
||||
private stopPromiseResolver: (value?: PromiseLike<void>) => void = () => {};
|
||||
private stopError?: Error;
|
||||
private accessTokenFactory?: () => string | Promise<string>;
|
||||
private sendQueue?: TransportSendQueue;
|
||||
|
|
@ -214,7 +214,6 @@ export class HttpConnection implements IConnection {
|
|||
this.transport = undefined;
|
||||
} else {
|
||||
this.logger.log(LogLevel.Debug, "HttpConnection.transport is undefined in HttpConnection.stop() because start() failed.");
|
||||
this.stopConnection();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -294,6 +293,9 @@ export class HttpConnection implements IConnection {
|
|||
this.logger.log(LogLevel.Error, "Failed to start the connection: " + e);
|
||||
this.connectionState = ConnectionState.Disconnected;
|
||||
this.transport = undefined;
|
||||
|
||||
// if start fails, any active calls to stop assume that start will complete the stop promise
|
||||
this.stopPromiseResolver();
|
||||
return Promise.reject(e);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -766,7 +766,11 @@ export class HubConnection {
|
|||
this.logger.log(LogLevel.Information, `Reconnect attempt failed because of error '${e}'.`);
|
||||
|
||||
if (this.connectionState !== HubConnectionState.Reconnecting) {
|
||||
this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect attempt. Done reconnecting.");
|
||||
this.logger.log(LogLevel.Debug, `Connection moved to the '${this.connectionState}' from the reconnecting state during reconnect attempt. Done reconnecting.`);
|
||||
// The TypeScript compiler thinks that connectionState must be Connected here. The TypeScript compiler is wrong.
|
||||
if (this.connectionState as any === HubConnectionState.Disconnecting) {
|
||||
this.completeClose();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -134,23 +134,30 @@ describe("HttpConnection", () => {
|
|||
|
||||
it("can stop a starting connection", async () => {
|
||||
await VerifyLogger.run(async (logger) => {
|
||||
const stoppingPromise = new PromiseSource();
|
||||
const startingPromise = new PromiseSource();
|
||||
const options: IHttpConnectionOptions = {
|
||||
...commonOptions,
|
||||
httpClient: new TestHttpClient()
|
||||
.on("POST", async () => {
|
||||
await connection.stop();
|
||||
startingPromise.resolve();
|
||||
await stoppingPromise;
|
||||
return "{}";
|
||||
})
|
||||
.on("GET", async () => {
|
||||
await connection.stop();
|
||||
return "";
|
||||
}),
|
||||
logger,
|
||||
} as IHttpConnectionOptions;
|
||||
|
||||
const connection = new HttpConnection("http://tempuri.org", options);
|
||||
|
||||
await expect(connection.start(TransferFormat.Text))
|
||||
const startPromise = connection.start(TransferFormat.Text);
|
||||
|
||||
await startingPromise;
|
||||
const stopPromise = connection.stop();
|
||||
stoppingPromise.resolve();
|
||||
|
||||
await stopPromise;
|
||||
|
||||
await expect(startPromise)
|
||||
.rejects
|
||||
.toThrow("The connection was stopped during negotiation.");
|
||||
},
|
||||
|
|
|
|||
|
|
@ -2,13 +2,17 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
import { DefaultReconnectPolicy } from "../src/DefaultReconnectPolicy";
|
||||
import { HttpConnection, INegotiateResponse } from "../src/HttpConnection";
|
||||
import { HubConnection, HubConnectionState } from "../src/HubConnection";
|
||||
import { IHttpConnectionOptions } from "../src/IHttpConnectionOptions";
|
||||
import { MessageType } from "../src/IHubProtocol";
|
||||
import { RetryContext } from "../src/IRetryPolicy";
|
||||
import { JsonHubProtocol } from "../src/JsonHubProtocol";
|
||||
|
||||
import { VerifyLogger } from "./Common";
|
||||
import { TestConnection } from "./TestConnection";
|
||||
import { TestHttpClient } from "./TestHttpClient";
|
||||
import { TestEvent, TestMessageEvent, TestWebSocket } from "./TestWebSocket";
|
||||
import { PromiseSource } from "./Utils";
|
||||
|
||||
describe("auto reconnect", () => {
|
||||
|
|
@ -785,4 +789,93 @@ describe("auto reconnect", () => {
|
|||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("can be stopped while restarting the underlying connection and negotiate throws", async () => {
|
||||
await VerifyLogger.run(async (logger) => {
|
||||
let onreconnectingCount = 0;
|
||||
let onreconnectedCount = 0;
|
||||
let closeCount = 0;
|
||||
|
||||
const nextRetryDelayCalledPromise = new PromiseSource();
|
||||
|
||||
const defaultConnectionId = "abc123";
|
||||
const defaultConnectionToken = "123abc";
|
||||
const defaultNegotiateResponse: INegotiateResponse = {
|
||||
availableTransports: [
|
||||
{ transport: "WebSockets", transferFormats: ["Text", "Binary"] },
|
||||
{ transport: "ServerSentEvents", transferFormats: ["Text"] },
|
||||
{ transport: "LongPolling", transferFormats: ["Text", "Binary"] },
|
||||
],
|
||||
connectionId: defaultConnectionId,
|
||||
connectionToken: defaultConnectionToken,
|
||||
negotiateVersion: 1,
|
||||
};
|
||||
|
||||
const startStarted = new PromiseSource();
|
||||
let negotiateCount = 0;
|
||||
|
||||
const options: IHttpConnectionOptions = {
|
||||
WebSocket: TestWebSocket,
|
||||
httpClient: new TestHttpClient()
|
||||
.on("POST", async () => {
|
||||
++negotiateCount;
|
||||
if (negotiateCount === 1) {
|
||||
return defaultNegotiateResponse;
|
||||
}
|
||||
startStarted.resolve();
|
||||
return Promise.reject("Error with negotiate");
|
||||
})
|
||||
.on("GET", () => ""),
|
||||
logger,
|
||||
} as IHttpConnectionOptions;
|
||||
|
||||
const connection = new HttpConnection("http://tempuri.org", options);
|
||||
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), {
|
||||
nextRetryDelayInMilliseconds() {
|
||||
nextRetryDelayCalledPromise.resolve();
|
||||
return 0;
|
||||
},
|
||||
});
|
||||
|
||||
hubConnection.onreconnecting(() => {
|
||||
onreconnectingCount++;
|
||||
});
|
||||
|
||||
hubConnection.onreconnected(() => {
|
||||
onreconnectedCount++;
|
||||
});
|
||||
|
||||
hubConnection.onclose(() => {
|
||||
closeCount++;
|
||||
});
|
||||
|
||||
TestWebSocket.webSocketSet = new PromiseSource();
|
||||
const startPromise = hubConnection.start();
|
||||
await TestWebSocket.webSocketSet;
|
||||
await TestWebSocket.webSocket.openSet;
|
||||
TestWebSocket.webSocket.onopen(new TestEvent());
|
||||
TestWebSocket.webSocket.onmessage(new TestMessageEvent("{}\x1e"));
|
||||
|
||||
await startPromise;
|
||||
TestWebSocket.webSocket.close();
|
||||
TestWebSocket.webSocketSet = new PromiseSource();
|
||||
|
||||
await nextRetryDelayCalledPromise;
|
||||
|
||||
expect(hubConnection.state).toBe(HubConnectionState.Reconnecting);
|
||||
expect(onreconnectingCount).toBe(1);
|
||||
expect(onreconnectedCount).toBe(0);
|
||||
expect(closeCount).toBe(0);
|
||||
|
||||
await startStarted;
|
||||
await hubConnection.stop();
|
||||
|
||||
expect(hubConnection.state).toBe(HubConnectionState.Disconnected);
|
||||
expect(onreconnectingCount).toBe(1);
|
||||
expect(onreconnectedCount).toBe(0);
|
||||
expect(closeCount).toBe(1);
|
||||
},
|
||||
"Failed to complete negotiation with the server: Error with negotiate",
|
||||
"Failed to start the connection: Error with negotiate");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -221,3 +221,57 @@ export class TestCloseEvent {
|
|||
public CAPTURING_PHASE: number = 0;
|
||||
public NONE: number = 0;
|
||||
}
|
||||
|
||||
export class TestMessageEvent implements MessageEvent {
|
||||
constructor(data: any) {
|
||||
this.data = data;
|
||||
}
|
||||
public data: any;
|
||||
public lastEventId: string = "";
|
||||
public origin: string = "";
|
||||
public ports: MessagePort[] = [];
|
||||
public source: Window | null = null;
|
||||
public composed: boolean = false;
|
||||
public composedPath(): EventTarget[];
|
||||
public composedPath(): any[] {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public code: number = 0;
|
||||
public reason: string = "";
|
||||
public wasClean: boolean = false;
|
||||
public initMessageEvent(typeArg: string, canBubbleArg: boolean, cancelableArg: boolean, data: any, origin: string, lastEventId: string): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public bubbles: boolean = false;
|
||||
public cancelBubble: boolean = false;
|
||||
public cancelable: boolean = false;
|
||||
public currentTarget!: EventTarget;
|
||||
public defaultPrevented: boolean = false;
|
||||
public eventPhase: number = 0;
|
||||
public isTrusted: boolean = false;
|
||||
public returnValue: boolean = false;
|
||||
public scoped: boolean = false;
|
||||
public srcElement!: Element | null;
|
||||
public target!: EventTarget;
|
||||
public timeStamp: number = 0;
|
||||
public type: string = "";
|
||||
public deepPath(): EventTarget[] {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public initEvent(type: string, bubbles?: boolean | undefined, cancelable?: boolean | undefined): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public preventDefault(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public stopImmediatePropagation(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public stopPropagation(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public AT_TARGET: number = 0;
|
||||
public BUBBLING_PHASE: number = 0;
|
||||
public CAPTURING_PHASE: number = 0;
|
||||
public NONE: number = 0;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue