diff --git a/clients/ts/FunctionalTests/ts/HubConnectionTests.ts b/clients/ts/FunctionalTests/ts/HubConnectionTests.ts index 6a4a84e853..dabf807aeb 100644 --- a/clients/ts/FunctionalTests/ts/HubConnectionTests.ts +++ b/clients/ts/FunctionalTests/ts/HubConnectionTests.ts @@ -1,7 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -import { DefaultHttpClient, HttpClient, HttpRequest, HttpResponse, HttpTransportType, HubConnection, HubConnectionBuilder, IHttpConnectionOptions, IStreamSubscriber, JsonHubProtocol, LogLevel } from "@aspnet/signalr"; +import { AbortError, DefaultHttpClient, HttpClient, HttpRequest, HttpResponse, HttpTransportType, HubConnection, HubConnectionBuilder, IHttpConnectionOptions, IStreamSubscriber, JsonHubProtocol, LogLevel } from "@aspnet/signalr"; import { MessagePackHubProtocol } from "@aspnet/signalr-protocol-msgpack"; import { eachTransport, eachTransportAndProtocol } from "./Common"; @@ -546,32 +546,6 @@ describe("hubConnection", () => { } }); - it("can connect to hub with authorization using async token factory", async (done) => { - const message = "你好,世界!"; - - try { - const hubConnection = getConnectionBuilder(transportType, "/authorizedhub", { - accessTokenFactory: () => getJwtToken("http://" + document.location.host + "/generateJwtToken"), - }).build(); - - hubConnection.onclose((error) => { - expect(error).toBe(undefined); - done(); - }); - await hubConnection.start(); - const response = await hubConnection.invoke("Echo", message); - - expect(response).toEqual(message); - - await hubConnection.stop(); - - done(); - } catch (err) { - fail(err); - done(); - } - }); - if (transportType !== HttpTransportType.LongPolling) { it("terminates if no messages received within timeout interval", (done) => { const hubConnection = getConnectionBuilder(transportType).build(); @@ -677,7 +651,16 @@ describe("hubConnection", () => { // Stop the connection and await the poll terminating const stopPromise = hubConnection.stop(); - await testClient.pollPromise; + try { + await testClient.pollPromise; + } catch (e) { + if (e instanceof AbortError) { + // Poll request may have been aborted + } else { + throw e; + } + } + await stopPromise; } catch (e) { fail(e); diff --git a/clients/ts/signalr/src/Errors.ts b/clients/ts/signalr/src/Errors.ts index 4bed501a61..03c99fb0ec 100644 --- a/clients/ts/signalr/src/Errors.ts +++ b/clients/ts/signalr/src/Errors.ts @@ -43,3 +43,22 @@ export class TimeoutError extends Error { this.__proto__ = trueProto; } } + +/** Error thrown when an action is aborted. */ +export class AbortError extends Error { + // tslint:disable-next-line:variable-name + private __proto__: Error; + + /** Constructs a new instance of {@link AbortError}. + * + * @param {string} errorMessage A descriptive error message. + */ + constructor(errorMessage: string = "An abort occurred.") { + const trueProto = new.target.prototype; + super(errorMessage); + + // Workaround issue in Typescript compiler + // https://github.com/Microsoft/TypeScript/issues/13965#issuecomment-278570200 + this.__proto__ = trueProto; + } +} diff --git a/clients/ts/signalr/src/HttpClient.ts b/clients/ts/signalr/src/HttpClient.ts index 7fb94fac0a..eaaaa5fdac 100644 --- a/clients/ts/signalr/src/HttpClient.ts +++ b/clients/ts/signalr/src/HttpClient.ts @@ -2,7 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. import { AbortSignal } from "./AbortController"; -import { HttpError, TimeoutError } from "./Errors"; +import { AbortError, HttpError, TimeoutError } from "./Errors"; import { ILogger, LogLevel } from "./ILogger"; /** Represents an HTTP request. */ @@ -158,6 +158,12 @@ export class DefaultHttpClient extends HttpClient { /** @inheritDoc */ public send(request: HttpRequest): Promise { return new Promise((resolve, reject) => { + // Check that abort was not signaled before calling send + if (request.abortSignal && request.abortSignal.aborted) { + reject(new AbortError()); + return; + } + const xhr = new XMLHttpRequest(); xhr.open(request.method, request.url, true); @@ -178,6 +184,7 @@ export class DefaultHttpClient extends HttpClient { if (request.abortSignal) { request.abortSignal.onabort = () => { xhr.abort(); + reject(new AbortError()); }; } diff --git a/clients/ts/signalr/src/LongPollingTransport.ts b/clients/ts/signalr/src/LongPollingTransport.ts index 6fdc830320..83dd402905 100644 --- a/clients/ts/signalr/src/LongPollingTransport.ts +++ b/clients/ts/signalr/src/LongPollingTransport.ts @@ -8,8 +8,6 @@ import { ILogger, LogLevel } from "./ILogger"; import { ITransport, TransferFormat } from "./ITransport"; import { Arg, getDataDetail, sendMessage } from "./Utils"; -const SHUTDOWN_TIMEOUT = 5 * 1000; - // Not exported from 'index', this type is internal. export class LongPollingTransport implements ITransport { private readonly httpClient: HttpClient; @@ -20,23 +18,21 @@ export class LongPollingTransport implements ITransport { private url: string; private pollXhr: XMLHttpRequest; private pollAbort: AbortController; - private shutdownTimer: any; // We use 'any' because this is an object in NodeJS. But it still gets passed to clearTimeout, so it doesn't really matter - private shutdownTimeout: number; private running: boolean; - private stopped: boolean; + private receiving: Promise; + private closeError: Error; // This is an internal type, not exported from 'index' so this is really just internal. public get pollAborted() { return this.pollAbort.aborted; } - constructor(httpClient: HttpClient, accessTokenFactory: () => string | Promise, logger: ILogger, logMessageContent: boolean, shutdownTimeout?: number) { + constructor(httpClient: HttpClient, accessTokenFactory: () => string | Promise, logger: ILogger, logMessageContent: boolean) { this.httpClient = httpClient; this.accessTokenFactory = accessTokenFactory || (() => null); this.logger = logger; this.pollAbort = new AbortController(); this.logMessageContent = logMessageContent; - this.shutdownTimeout = shutdownTimeout || SHUTDOWN_TIMEOUT; } public async connect(url: string, transferFormat: TransferFormat): Promise { @@ -66,8 +62,6 @@ export class LongPollingTransport implements ITransport { const token = await this.accessTokenFactory(); this.updateHeaderToken(pollOptions, token); - let closeError: Error; - // Make initial long polling request // Server uses first long polling request to finish initializing connection and it returns without data const pollUrl = `${url}&_=${Date.now()}`; @@ -77,14 +71,13 @@ export class LongPollingTransport implements ITransport { this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}`); // Mark running as false so that the poll immediately ends and runs the close logic - closeError = new HttpError(response.statusText, response.statusCode); + this.closeError = new HttpError(response.statusText, response.statusCode); this.running = false; } else { this.running = true; } - this.poll(this.url, pollOptions, closeError); - return Promise.resolve(); + this.receiving = this.poll(this.url, pollOptions); } private updateHeaderToken(request: HttpRequest, token: string) { @@ -100,7 +93,7 @@ export class LongPollingTransport implements ITransport { } } - private async poll(url: string, pollOptions: HttpRequest, closeError: Error): Promise { + private async poll(url: string, pollOptions: HttpRequest): Promise { try { while (this.running) { // We have to get the access token on each poll, in case it changes @@ -120,7 +113,7 @@ export class LongPollingTransport implements ITransport { this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}`); // Unexpected status code - closeError = new HttpError(response.statusText, response.statusCode); + this.closeError = new HttpError(response.statusText, response.statusCode); this.running = false; } else { // Process the response @@ -136,7 +129,7 @@ export class LongPollingTransport implements ITransport { } } catch (e) { if (!this.running) { - // Log but disregard errors that occur after we were stopped by DELETE + // Log but disregard errors that occur after stopping this.logger.log(LogLevel.Trace, `(LongPolling transport) Poll errored after shutdown: ${e.message}`); } else { if (e instanceof TimeoutError) { @@ -144,28 +137,20 @@ export class LongPollingTransport implements ITransport { this.logger.log(LogLevel.Trace, "(LongPolling transport) Poll timed out, reissuing."); } else { // Close the connection with the error as the result. - closeError = e; + this.closeError = e; this.running = false; } } } } } finally { - // Indicate that we've stopped so the shutdown timer doesn't get registered. - this.stopped = true; + this.logger.log(LogLevel.Trace, "(LongPolling transport) Polling complete."); - // Clean up the shutdown timer if it was registered - if (this.shutdownTimer) { - clearTimeout(this.shutdownTimer); + // We will reach here with pollAborted==false when the server returned a response causing the transport to stop. + // If pollAborted==true then client initiated the stop and the stop method will raise the close event after DELETE is sent. + if (!this.pollAborted) { + this.raiseOnClose(); } - - // Fire our onclosed event - if (this.onclose) { - this.logger.log(LogLevel.Trace, `(LongPolling transport) Firing onclose event. Error: ${closeError || ""}`); - this.onclose(closeError); - } - - this.logger.log(LogLevel.Trace, "(LongPolling transport) Transport finished."); } } @@ -177,9 +162,16 @@ export class LongPollingTransport implements ITransport { } public async stop(): Promise { - // Send a DELETE request to stop the poll + this.logger.log(LogLevel.Trace, "(LongPolling transport) Stopping polling."); + + // Tell receiving loop to stop, abort any current request, and then wait for it to finish + this.running = false; + this.pollAbort.abort(); + try { - this.running = false; + await this.receiving; + + // Send DELETE to clean up long polling on the server this.logger.log(LogLevel.Trace, `(LongPolling transport) sending DELETE request to ${this.url}.`); const deleteOptions: HttpRequest = { @@ -187,19 +179,26 @@ export class LongPollingTransport implements ITransport { }; const token = await this.accessTokenFactory(); this.updateHeaderToken(deleteOptions, token); - const response = await this.httpClient.delete(this.url, deleteOptions); + await this.httpClient.delete(this.url, deleteOptions); - this.logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request accepted."); + this.logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request sent."); } finally { - // Abort the poll after the shutdown timeout if the server doesn't stop the poll. - if (!this.stopped) { - this.shutdownTimer = setTimeout(() => { - this.logger.log(LogLevel.Warning, "(LongPolling transport) server did not terminate after DELETE request, canceling poll."); + this.logger.log(LogLevel.Trace, "(LongPolling transport) Stop finished."); - // Abort any outstanding poll - this.pollAbort.abort(); - }, this.shutdownTimeout); + // Raise close event here instead of in polling + // It needs to happen after the DELETE request is sent + this.raiseOnClose(); + } + } + + private raiseOnClose() { + if (this.onclose) { + let logMessage = "(LongPolling transport) Firing onclose event."; + if (this.closeError) { + logMessage += " Error: " + this.closeError; } + this.logger.log(LogLevel.Trace, logMessage); + this.onclose(this.closeError); } } diff --git a/clients/ts/signalr/src/index.ts b/clients/ts/signalr/src/index.ts index 1872133098..266ee9fc69 100644 --- a/clients/ts/signalr/src/index.ts +++ b/clients/ts/signalr/src/index.ts @@ -7,7 +7,7 @@ export const VERSION: string = "0.0.0-DEV_BUILD"; // Everything that users need to access must be exported here. Including interfaces. export { AbortSignal } from "./AbortController"; -export { HttpError, TimeoutError } from "./Errors"; +export { AbortError, HttpError, TimeoutError } from "./Errors"; export { DefaultHttpClient, HttpClient, HttpRequest, HttpResponse } from "./HttpClient"; export { IHttpConnectionOptions } from "./IHttpConnectionOptions"; export { HubConnection, HubConnectionState } from "./HubConnection"; diff --git a/clients/ts/signalr/tests/LongPollingTransport.test.ts b/clients/ts/signalr/tests/LongPollingTransport.test.ts index b801afec19..e8660ff992 100644 --- a/clients/ts/signalr/tests/LongPollingTransport.test.ts +++ b/clients/ts/signalr/tests/LongPollingTransport.test.ts @@ -7,10 +7,10 @@ import { LongPollingTransport } from "../src/LongPollingTransport"; import { ConsoleLogger } from "../src/Utils"; import { TestHttpClient } from "./TestHttpClient"; -import { delay, PromiseSource } from "./Utils"; +import { delay, PromiseSource, SyncPoint } from "./Utils"; describe("LongPollingTransport", () => { - it("shuts down poll after timeout even if server doesn't shut it down on receiving the DELETE", async () => { + it("shuts down polling by aborting in-progress request", async () => { let firstPoll = true; const pollCompleted = new PromiseSource(); const client = new TestHttpClient() @@ -25,100 +25,98 @@ describe("LongPollingTransport", () => { // Signal that the poll has completed. pollCompleted.resolve(); - return new HttpResponse(204); + + return new HttpResponse(200); } }) .on("DELETE", (r) => new HttpResponse(202)); - const transport = new LongPollingTransport(client, null, NullLogger.instance, false, 100); + const transport = new LongPollingTransport(client, null, NullLogger.instance, false); await transport.connect("http://example.com", TransferFormat.Text); - await transport.stop(); + const stopPromise = transport.stop(); - // This should complete within the shutdown timeout await pollCompleted.promise; + + await stopPromise; }); - it("sends DELETE request on stop", async () => { + it("204 server response stops polling and raises onClose", async () => { let firstPoll = true; - const deleteReceived = new PromiseSource(); - const pollCompleted = new PromiseSource(); + let onCloseCalled = false; const client = new TestHttpClient() .on("GET", async (r) => { if (firstPoll) { firstPoll = false; return new HttpResponse(200); } else { - await deleteReceived.promise; - // Force the shutdown timer to be registered by not returning inline - await delay(10); - pollCompleted.resolve(); + // A 204 response will stop the long polling transport + return new HttpResponse(204); + } + }); + const transport = new LongPollingTransport(client, null, NullLogger.instance, false); + + const stopPromise = makeClosedPromise(transport); + + await transport.connect("http://example.com", TransferFormat.Text); + + // Close will be called on transport because of 204 result from polling + await stopPromise; + }); + + it("sends DELETE on stop after polling has finished", async () => { + let firstPoll = true; + let deleteSent = false; + const pollingPromiseSource = new PromiseSource(); + const deleteSyncPoint = new SyncPoint(); + const httpClient = new TestHttpClient() + .on("GET", async (r) => { + if (firstPoll) { + firstPoll = false; + return new HttpResponse(200); + } else { + await pollingPromiseSource.promise; return new HttpResponse(204); } }) - .on("DELETE", (r) => { - deleteReceived.resolve(); + .on("DELETE", async (r) => { + deleteSent = true; + await deleteSyncPoint.waitToContinue(); return new HttpResponse(202); }); - const transport = new LongPollingTransport(client, null, NullLogger.instance, false, 100); + + const transport = new LongPollingTransport(httpClient, null, NullLogger.instance, false); - await transport.connect("http://example.com", TransferFormat.Text); - await transport.stop(); + await transport.connect("http://tempuri.org", TransferFormat.Text); - // This should complete, because the DELETE request triggers it to stop. - await pollCompleted.promise; + // Begin stopping transport + const stopPromise = transport.stop(); + + // Delete will not be sent until polling is finished + expect(deleteSent).toEqual(false); + + // Allow polling to complete + pollingPromiseSource.resolve(); + + // Wait for delete to be called + await deleteSyncPoint.waitForSyncPoint(); + + expect(deleteSent).toEqual(true); + + deleteSyncPoint.continue(); + + // Wait for stop to complete + await stopPromise; }); - - for (const result of [200, 204, 300, new HttpError("Boom", 500), new TimeoutError()]) { - - // Function has a name property but TypeScript doesn't know about it. - const resultName = typeof result === "number" ? result.toString() : (result.constructor as any).name; - - it(`does not fire shutdown timer when poll terminates with ${resultName}`, async () => { - let firstPoll = true; - const deleteReceived = new PromiseSource(); - const pollCompleted = new PromiseSource(); - const client = new TestHttpClient() - .on("GET", async (r) => { - if (firstPoll) { - firstPoll = false; - return new HttpResponse(200); - } else { - await deleteReceived.promise; - // Force the shutdown timer to be registered by not returning inline - await delay(10); - pollCompleted.resolve(); - - if (typeof result === "number") { - return new HttpResponse(result); - } else { - throw result; - } - } - }) - .on("DELETE", (r) => { - deleteReceived.resolve(); - return new HttpResponse(202); - }); - const logMessages: string[] = []; - const transport = new LongPollingTransport(client, null, { - log(level: LogLevel, message: string) { - logMessages.push(message); - }, - }, false, 100); - - await transport.connect("http://example.com", TransferFormat.Text); - await transport.stop(); - - // This should complete, because the DELETE request triggers it to stop. - await pollCompleted.promise; - - // Wait for the shutdown timeout to elapse - // This can be much cleaner when we port to Jest because it has a built-in set of - // fake timers! - await delay(150); - - // The pollAbort token should be left unaborted because we shut down gracefully. - expect(transport.pollAborted).toBe(false); - }); - } }); + +function makeClosedPromise(transport: LongPollingTransport): Promise { + const closed = new PromiseSource(); + transport.onclose = (error) => { + if (error) { + closed.reject(error); + } else { + closed.resolve(); + } + }; + return closed.promise; +} \ No newline at end of file diff --git a/clients/ts/signalr/tests/Utils.ts b/clients/ts/signalr/tests/Utils.ts index 7d1c881e9f..b82a9046ea 100644 --- a/clients/ts/signalr/tests/Utils.ts +++ b/clients/ts/signalr/tests/Utils.ts @@ -38,3 +38,26 @@ export class PromiseSource implements Promise { return this.promise.catch(onrejected); } } + +export class SyncPoint { + private atSyncPoint: PromiseSource; + private continueFromSyncPoint: PromiseSource; + + constructor() { + this.atSyncPoint = new PromiseSource(); + this.continueFromSyncPoint = new PromiseSource(); + } + + public waitForSyncPoint(): Promise { + return this.atSyncPoint.promise; + } + + public continue() { + this.continueFromSyncPoint.resolve(); + } + + public waitToContinue(): Promise { + this.atSyncPoint.resolve(); + return this.continueFromSyncPoint.promise; + } +} \ No newline at end of file