Fix cleaning send queue on restart (#18511)

This commit is contained in:
Brennan 2020-01-24 13:26:45 -08:00 committed by Doug Bunting
parent f789f0e959
commit 66ddf3523a
8 changed files with 284 additions and 1183 deletions

View File

@ -18,12 +18,12 @@
"es6-promise": "^4.2.4",
"jasmine": "^3.2.0",
"jasmine-core": "^3.2.1",
"karma": "^3.0.0",
"karma": "^4.4.1",
"karma-chrome-launcher": "^2.2.0",
"karma-edge-launcher": "^0.4.2",
"karma-firefox-launcher": "^1.1.0",
"karma-firefox-launcher": "^1.3.0",
"karma-ie-launcher": "^1.0.0",
"karma-jasmine": "^1.1.2",
"karma-jasmine": "^3.1.0",
"karma-junit-reporter": "^1.2.0",
"karma-mocha-reporter": "^2.2.5",
"karma-safari-launcher": "^1.0.0",
@ -31,8 +31,8 @@
"karma-sourcemap-loader": "^0.3.7",
"karma-summary-reporter": "^1.6.0",
"rxjs": "^6.3.3",
"ts-node": "^4.1.0",
"typescript": "^2.7.1",
"ts-node": "^8.6.2",
"typescript": "^3.7.5",
"ws": " ^6.0.0"
},
"scripts": {

View File

@ -43,17 +43,17 @@ try {
}
// We use the launchers themselves to figure out if the browser exists. It's a bit sneaky, but it works.
tryAddBrowser("ChromeHeadlessNoSandbox", new ChromeHeadlessBrowser(() => { }, {}));
tryAddBrowser("ChromiumHeadlessIgnoreCert", new ChromiumHeadlessBrowser(() => { }, {}));
if (!tryAddBrowser("FirefoxHeadless", new FirefoxHeadlessBrowser(0, () => { }, {}))) {
tryAddBrowser("FirefoxDeveloperHeadless", new FirefoxDeveloperHeadlessBrowser(0, () => { }, {}));
tryAddBrowser("ChromeHeadlessNoSandbox", ChromeHeadlessBrowser.prototype);
tryAddBrowser("ChromiumHeadlessIgnoreCert", ChromiumHeadlessBrowser.prototype);
if (!tryAddBrowser("FirefoxHeadless", FirefoxHeadlessBrowser.prototype)) {
tryAddBrowser("FirefoxDeveloperHeadless", FirefoxDeveloperHeadlessBrowser.prototype);
}
// We need to receive an argument from the caller, but globals don't seem to work, so we use an environment variable.
if (process.env.ASPNETCORE_SIGNALR_TEST_ALL_BROWSERS === "true") {
tryAddBrowser("Edge", new EdgeBrowser(() => { }, { create() { } }));
tryAddBrowser("IE", new IEBrowser(() => { }, { create() { } }, {}));
tryAddBrowser("Safari", new SafariBrowser(() => { }, {}));
tryAddBrowser("Edge", EdgeBrowser.prototype);
tryAddBrowser("IE", IEBrowser.prototype);
tryAddBrowser("Safari", SafariBrowser.prototype);
}
module.exports = createKarmaConfig({

File diff suppressed because it is too large Load Diff

View File

@ -194,15 +194,6 @@ export class HttpConnection implements IConnection {
// This exception is returned to the user as a rejected Promise from the start method.
}
if (this.sendQueue) {
try {
await this.sendQueue.stop();
} catch (e) {
this.logger.log(LogLevel.Error, `TransportSendQueue.stop() threw error '${e}'.`);
}
this.sendQueue = undefined;
}
// The transport's onclose will trigger stopConnection which will run our onclose event.
// The transport should always be set if currently connected. If it wasn't set, it's likely because
// stop was called during start() and start() failed.
@ -490,14 +481,22 @@ export class HttpConnection implements IConnection {
this.logger.log(LogLevel.Information, "Connection disconnected.");
}
if (this.sendQueue) {
this.sendQueue.stop().catch((e) => {
this.logger.log(LogLevel.Error, `TransportSendQueue.stop() threw error '${e}'.`);
});
this.sendQueue = undefined;
}
this.connectionId = undefined;
this.connectionState = ConnectionState.Disconnected;
if (this.onclose && this.connectionStarted) {
if (this.connectionStarted) {
this.connectionStarted = false;
try {
this.onclose(error);
if (this.onclose) {
this.onclose(error);
}
} catch (e) {
this.logger.log(LogLevel.Error, `HttpConnection.onclose(${error}) threw error '${e}'.`);
}

View File

@ -35,6 +35,13 @@ export class NodeHttpClient extends HttpClient {
}
public send(httpRequest: HttpRequest): Promise<HttpResponse> {
// Check that abort was not signaled before calling send
if (httpRequest.abortSignal) {
if (httpRequest.abortSignal.aborted) {
return Promise.reject(new AbortError());
}
}
return new Promise<HttpResponse>((resolve, reject) => {
let requestBody: Buffer | string;

View File

@ -35,7 +35,6 @@ export class WebSocketTransport implements ITransport {
Arg.isRequired(url, "url");
Arg.isRequired(transferFormat, "transferFormat");
Arg.isIn(transferFormat, TransferFormat, "transferFormat");
this.logger.log(LogLevel.Trace, "(WebSockets transport) Connecting.");
if (this.accessTokenFactory) {
@ -128,13 +127,6 @@ export class WebSocketTransport implements ITransport {
public stop(): Promise<void> {
if (this.webSocket) {
// Clear websocket handlers because we are considering the socket closed now
this.webSocket.onclose = () => {};
this.webSocket.onmessage = () => {};
this.webSocket.onerror = () => {};
this.webSocket.close();
this.webSocket = undefined;
// Manually invoke onclose callback inline so we know the HttpConnection was closed properly before returning
// This also solves an issue where websocket.onclose could take 18+ seconds to trigger during network disconnects
this.close(undefined);
@ -145,6 +137,15 @@ export class WebSocketTransport implements ITransport {
private close(event?: CloseEvent): void {
// webSocket will be null if the transport did not start successfully
if (this.webSocket) {
// Clear websocket handlers because we are considering the socket closed now
this.webSocket.onclose = () => {};
this.webSocket.onmessage = () => {};
this.webSocket.onerror = () => {};
this.webSocket.close();
this.webSocket = undefined;
}
this.logger.log(LogLevel.Trace, "(WebSockets transport) socket closed.");
if (this.onclose) {
if (event && (event.wasClean === false || event.code !== 1000)) {

View File

@ -1124,6 +1124,44 @@ describe("HttpConnection", () => {
"Failed to start the transport 'WebSockets': Error: There was an error with the transport.");
});
it("send after restarting connection works", async () => {
await VerifyLogger.run(async (logger) => {
const options: IHttpConnectionOptions = {
...commonOptions,
WebSocket: TestWebSocket,
httpClient: new TestHttpClient()
.on("POST", () => defaultNegotiateResponse)
.on("GET", () => ""),
logger,
} as IHttpConnectionOptions;
const connection = new HttpConnection("http://tempuri.org", options);
const closePromise = new PromiseSource();
connection.onclose = (e) => {
closePromise.resolve();
};
TestWebSocket.webSocketSet = new PromiseSource();
let startPromise = connection.start(TransferFormat.Text);
await TestWebSocket.webSocketSet;
await TestWebSocket.webSocket.openSet;
TestWebSocket.webSocket.onopen(new TestEvent());
await startPromise;
await connection.send("text");
TestWebSocket.webSocket.close();
TestWebSocket.webSocketSet = new PromiseSource();
await closePromise;
startPromise = connection.start(TransferFormat.Text);
await TestWebSocket.webSocketSet;
TestWebSocket.webSocket.onopen(new TestEvent());
await startPromise;
await connection.send("text");
});
});
describe(".constructor", () => {
it("throws if no Url is provided", async () => {
// Force TypeScript to let us call the constructor incorrectly :)

View File

@ -12,6 +12,7 @@ export class TestWebSocket {
public protocol: string;
public readyState: number = 1;
public url: string;
public closed: boolean = false;
public static webSocketSet: PromiseSource;
public static webSocket: TestWebSocket;
@ -26,7 +27,10 @@ export class TestWebSocket {
}
public get onopen(): (this: WebSocket, evt: Event) => any {
return this._onopen!;
return (e) => {
this._onopen!(e);
this.readyState = this.OPEN;
};
}
// tslint:disable-next-line:variable-name
@ -38,18 +42,26 @@ export class TestWebSocket {
}
public get onclose(): (this: WebSocket, evt: Event) => any {
return this._onclose!;
return (e) => {
this._onclose!(e);
this.readyState = this.CLOSED;
};
}
public close(code?: number | undefined, reason?: string | undefined): void {
this.closed = true;
const closeEvent = new TestCloseEvent();
closeEvent.code = code || 1000;
closeEvent.reason = reason!;
closeEvent.wasClean = closeEvent.code === 1000;
this.readyState = this.CLOSED;
this.onclose(closeEvent);
}
public send(data: string | ArrayBuffer | Blob | ArrayBufferView): void {
if (this.closed) {
throw new Error(`cannot send from a closed transport: '${data}'`);
}
this.receivedData.push(data);
}