Refactor LongPollingTransport stop in TS client to align with C# client (#2292)
This commit is contained in:
parent
a699b61ffa
commit
44f914b9b2
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
import { DefaultHttpClient, HttpClient, HttpRequest, HttpResponse, HttpTransportType, HubConnection, HubConnectionBuilder, IHttpConnectionOptions, IStreamSubscriber, JsonHubProtocol, LogLevel } from "@aspnet/signalr";
|
||||
import { AbortError, DefaultHttpClient, HttpClient, HttpRequest, HttpResponse, HttpTransportType, HubConnection, HubConnectionBuilder, IHttpConnectionOptions, IStreamSubscriber, JsonHubProtocol, LogLevel } from "@aspnet/signalr";
|
||||
import { MessagePackHubProtocol } from "@aspnet/signalr-protocol-msgpack";
|
||||
|
||||
import { eachTransport, eachTransportAndProtocol } from "./Common";
|
||||
|
|
@ -546,32 +546,6 @@ describe("hubConnection", () => {
|
|||
}
|
||||
});
|
||||
|
||||
it("can connect to hub with authorization using async token factory", async (done) => {
|
||||
const message = "你好,世界!";
|
||||
|
||||
try {
|
||||
const hubConnection = getConnectionBuilder(transportType, "/authorizedhub", {
|
||||
accessTokenFactory: () => getJwtToken("http://" + document.location.host + "/generateJwtToken"),
|
||||
}).build();
|
||||
|
||||
hubConnection.onclose((error) => {
|
||||
expect(error).toBe(undefined);
|
||||
done();
|
||||
});
|
||||
await hubConnection.start();
|
||||
const response = await hubConnection.invoke("Echo", message);
|
||||
|
||||
expect(response).toEqual(message);
|
||||
|
||||
await hubConnection.stop();
|
||||
|
||||
done();
|
||||
} catch (err) {
|
||||
fail(err);
|
||||
done();
|
||||
}
|
||||
});
|
||||
|
||||
if (transportType !== HttpTransportType.LongPolling) {
|
||||
it("terminates if no messages received within timeout interval", (done) => {
|
||||
const hubConnection = getConnectionBuilder(transportType).build();
|
||||
|
|
@ -677,7 +651,16 @@ describe("hubConnection", () => {
|
|||
// Stop the connection and await the poll terminating
|
||||
const stopPromise = hubConnection.stop();
|
||||
|
||||
await testClient.pollPromise;
|
||||
try {
|
||||
await testClient.pollPromise;
|
||||
} catch (e) {
|
||||
if (e instanceof AbortError) {
|
||||
// Poll request may have been aborted
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
await stopPromise;
|
||||
} catch (e) {
|
||||
fail(e);
|
||||
|
|
|
|||
|
|
@ -43,3 +43,22 @@ export class TimeoutError extends Error {
|
|||
this.__proto__ = trueProto;
|
||||
}
|
||||
}
|
||||
|
||||
/** Error thrown when an action is aborted. */
|
||||
export class AbortError extends Error {
|
||||
// tslint:disable-next-line:variable-name
|
||||
private __proto__: Error;
|
||||
|
||||
/** Constructs a new instance of {@link AbortError}.
|
||||
*
|
||||
* @param {string} errorMessage A descriptive error message.
|
||||
*/
|
||||
constructor(errorMessage: string = "An abort occurred.") {
|
||||
const trueProto = new.target.prototype;
|
||||
super(errorMessage);
|
||||
|
||||
// Workaround issue in Typescript compiler
|
||||
// https://github.com/Microsoft/TypeScript/issues/13965#issuecomment-278570200
|
||||
this.__proto__ = trueProto;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
import { AbortSignal } from "./AbortController";
|
||||
import { HttpError, TimeoutError } from "./Errors";
|
||||
import { AbortError, HttpError, TimeoutError } from "./Errors";
|
||||
import { ILogger, LogLevel } from "./ILogger";
|
||||
|
||||
/** Represents an HTTP request. */
|
||||
|
|
@ -158,6 +158,12 @@ export class DefaultHttpClient extends HttpClient {
|
|||
/** @inheritDoc */
|
||||
public send(request: HttpRequest): Promise<HttpResponse> {
|
||||
return new Promise<HttpResponse>((resolve, reject) => {
|
||||
// Check that abort was not signaled before calling send
|
||||
if (request.abortSignal && request.abortSignal.aborted) {
|
||||
reject(new AbortError());
|
||||
return;
|
||||
}
|
||||
|
||||
const xhr = new XMLHttpRequest();
|
||||
|
||||
xhr.open(request.method, request.url, true);
|
||||
|
|
@ -178,6 +184,7 @@ export class DefaultHttpClient extends HttpClient {
|
|||
if (request.abortSignal) {
|
||||
request.abortSignal.onabort = () => {
|
||||
xhr.abort();
|
||||
reject(new AbortError());
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,8 +8,6 @@ import { ILogger, LogLevel } from "./ILogger";
|
|||
import { ITransport, TransferFormat } from "./ITransport";
|
||||
import { Arg, getDataDetail, sendMessage } from "./Utils";
|
||||
|
||||
const SHUTDOWN_TIMEOUT = 5 * 1000;
|
||||
|
||||
// Not exported from 'index', this type is internal.
|
||||
export class LongPollingTransport implements ITransport {
|
||||
private readonly httpClient: HttpClient;
|
||||
|
|
@ -20,23 +18,21 @@ export class LongPollingTransport implements ITransport {
|
|||
private url: string;
|
||||
private pollXhr: XMLHttpRequest;
|
||||
private pollAbort: AbortController;
|
||||
private shutdownTimer: any; // We use 'any' because this is an object in NodeJS. But it still gets passed to clearTimeout, so it doesn't really matter
|
||||
private shutdownTimeout: number;
|
||||
private running: boolean;
|
||||
private stopped: boolean;
|
||||
private receiving: Promise<void>;
|
||||
private closeError: Error;
|
||||
|
||||
// This is an internal type, not exported from 'index' so this is really just internal.
|
||||
public get pollAborted() {
|
||||
return this.pollAbort.aborted;
|
||||
}
|
||||
|
||||
constructor(httpClient: HttpClient, accessTokenFactory: () => string | Promise<string>, logger: ILogger, logMessageContent: boolean, shutdownTimeout?: number) {
|
||||
constructor(httpClient: HttpClient, accessTokenFactory: () => string | Promise<string>, logger: ILogger, logMessageContent: boolean) {
|
||||
this.httpClient = httpClient;
|
||||
this.accessTokenFactory = accessTokenFactory || (() => null);
|
||||
this.logger = logger;
|
||||
this.pollAbort = new AbortController();
|
||||
this.logMessageContent = logMessageContent;
|
||||
this.shutdownTimeout = shutdownTimeout || SHUTDOWN_TIMEOUT;
|
||||
}
|
||||
|
||||
public async connect(url: string, transferFormat: TransferFormat): Promise<void> {
|
||||
|
|
@ -66,8 +62,6 @@ export class LongPollingTransport implements ITransport {
|
|||
const token = await this.accessTokenFactory();
|
||||
this.updateHeaderToken(pollOptions, token);
|
||||
|
||||
let closeError: Error;
|
||||
|
||||
// Make initial long polling request
|
||||
// Server uses first long polling request to finish initializing connection and it returns without data
|
||||
const pollUrl = `${url}&_=${Date.now()}`;
|
||||
|
|
@ -77,14 +71,13 @@ export class LongPollingTransport implements ITransport {
|
|||
this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}`);
|
||||
|
||||
// Mark running as false so that the poll immediately ends and runs the close logic
|
||||
closeError = new HttpError(response.statusText, response.statusCode);
|
||||
this.closeError = new HttpError(response.statusText, response.statusCode);
|
||||
this.running = false;
|
||||
} else {
|
||||
this.running = true;
|
||||
}
|
||||
|
||||
this.poll(this.url, pollOptions, closeError);
|
||||
return Promise.resolve();
|
||||
this.receiving = this.poll(this.url, pollOptions);
|
||||
}
|
||||
|
||||
private updateHeaderToken(request: HttpRequest, token: string) {
|
||||
|
|
@ -100,7 +93,7 @@ export class LongPollingTransport implements ITransport {
|
|||
}
|
||||
}
|
||||
|
||||
private async poll(url: string, pollOptions: HttpRequest, closeError: Error): Promise<void> {
|
||||
private async poll(url: string, pollOptions: HttpRequest): Promise<void> {
|
||||
try {
|
||||
while (this.running) {
|
||||
// We have to get the access token on each poll, in case it changes
|
||||
|
|
@ -120,7 +113,7 @@ export class LongPollingTransport implements ITransport {
|
|||
this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}`);
|
||||
|
||||
// Unexpected status code
|
||||
closeError = new HttpError(response.statusText, response.statusCode);
|
||||
this.closeError = new HttpError(response.statusText, response.statusCode);
|
||||
this.running = false;
|
||||
} else {
|
||||
// Process the response
|
||||
|
|
@ -136,7 +129,7 @@ export class LongPollingTransport implements ITransport {
|
|||
}
|
||||
} catch (e) {
|
||||
if (!this.running) {
|
||||
// Log but disregard errors that occur after we were stopped by DELETE
|
||||
// Log but disregard errors that occur after stopping
|
||||
this.logger.log(LogLevel.Trace, `(LongPolling transport) Poll errored after shutdown: ${e.message}`);
|
||||
} else {
|
||||
if (e instanceof TimeoutError) {
|
||||
|
|
@ -144,28 +137,20 @@ export class LongPollingTransport implements ITransport {
|
|||
this.logger.log(LogLevel.Trace, "(LongPolling transport) Poll timed out, reissuing.");
|
||||
} else {
|
||||
// Close the connection with the error as the result.
|
||||
closeError = e;
|
||||
this.closeError = e;
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// Indicate that we've stopped so the shutdown timer doesn't get registered.
|
||||
this.stopped = true;
|
||||
this.logger.log(LogLevel.Trace, "(LongPolling transport) Polling complete.");
|
||||
|
||||
// Clean up the shutdown timer if it was registered
|
||||
if (this.shutdownTimer) {
|
||||
clearTimeout(this.shutdownTimer);
|
||||
// We will reach here with pollAborted==false when the server returned a response causing the transport to stop.
|
||||
// If pollAborted==true then client initiated the stop and the stop method will raise the close event after DELETE is sent.
|
||||
if (!this.pollAborted) {
|
||||
this.raiseOnClose();
|
||||
}
|
||||
|
||||
// Fire our onclosed event
|
||||
if (this.onclose) {
|
||||
this.logger.log(LogLevel.Trace, `(LongPolling transport) Firing onclose event. Error: ${closeError || "<undefined>"}`);
|
||||
this.onclose(closeError);
|
||||
}
|
||||
|
||||
this.logger.log(LogLevel.Trace, "(LongPolling transport) Transport finished.");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -177,9 +162,16 @@ export class LongPollingTransport implements ITransport {
|
|||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
// Send a DELETE request to stop the poll
|
||||
this.logger.log(LogLevel.Trace, "(LongPolling transport) Stopping polling.");
|
||||
|
||||
// Tell receiving loop to stop, abort any current request, and then wait for it to finish
|
||||
this.running = false;
|
||||
this.pollAbort.abort();
|
||||
|
||||
try {
|
||||
this.running = false;
|
||||
await this.receiving;
|
||||
|
||||
// Send DELETE to clean up long polling on the server
|
||||
this.logger.log(LogLevel.Trace, `(LongPolling transport) sending DELETE request to ${this.url}.`);
|
||||
|
||||
const deleteOptions: HttpRequest = {
|
||||
|
|
@ -187,19 +179,26 @@ export class LongPollingTransport implements ITransport {
|
|||
};
|
||||
const token = await this.accessTokenFactory();
|
||||
this.updateHeaderToken(deleteOptions, token);
|
||||
const response = await this.httpClient.delete(this.url, deleteOptions);
|
||||
await this.httpClient.delete(this.url, deleteOptions);
|
||||
|
||||
this.logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request accepted.");
|
||||
this.logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request sent.");
|
||||
} finally {
|
||||
// Abort the poll after the shutdown timeout if the server doesn't stop the poll.
|
||||
if (!this.stopped) {
|
||||
this.shutdownTimer = setTimeout(() => {
|
||||
this.logger.log(LogLevel.Warning, "(LongPolling transport) server did not terminate after DELETE request, canceling poll.");
|
||||
this.logger.log(LogLevel.Trace, "(LongPolling transport) Stop finished.");
|
||||
|
||||
// Abort any outstanding poll
|
||||
this.pollAbort.abort();
|
||||
}, this.shutdownTimeout);
|
||||
// Raise close event here instead of in polling
|
||||
// It needs to happen after the DELETE request is sent
|
||||
this.raiseOnClose();
|
||||
}
|
||||
}
|
||||
|
||||
private raiseOnClose() {
|
||||
if (this.onclose) {
|
||||
let logMessage = "(LongPolling transport) Firing onclose event.";
|
||||
if (this.closeError) {
|
||||
logMessage += " Error: " + this.closeError;
|
||||
}
|
||||
this.logger.log(LogLevel.Trace, logMessage);
|
||||
this.onclose(this.closeError);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ export const VERSION: string = "0.0.0-DEV_BUILD";
|
|||
|
||||
// Everything that users need to access must be exported here. Including interfaces.
|
||||
export { AbortSignal } from "./AbortController";
|
||||
export { HttpError, TimeoutError } from "./Errors";
|
||||
export { AbortError, HttpError, TimeoutError } from "./Errors";
|
||||
export { DefaultHttpClient, HttpClient, HttpRequest, HttpResponse } from "./HttpClient";
|
||||
export { IHttpConnectionOptions } from "./IHttpConnectionOptions";
|
||||
export { HubConnection, HubConnectionState } from "./HubConnection";
|
||||
|
|
|
|||
|
|
@ -7,10 +7,10 @@ import { LongPollingTransport } from "../src/LongPollingTransport";
|
|||
import { ConsoleLogger } from "../src/Utils";
|
||||
|
||||
import { TestHttpClient } from "./TestHttpClient";
|
||||
import { delay, PromiseSource } from "./Utils";
|
||||
import { delay, PromiseSource, SyncPoint } from "./Utils";
|
||||
|
||||
describe("LongPollingTransport", () => {
|
||||
it("shuts down poll after timeout even if server doesn't shut it down on receiving the DELETE", async () => {
|
||||
it("shuts down polling by aborting in-progress request", async () => {
|
||||
let firstPoll = true;
|
||||
const pollCompleted = new PromiseSource();
|
||||
const client = new TestHttpClient()
|
||||
|
|
@ -25,100 +25,98 @@ describe("LongPollingTransport", () => {
|
|||
|
||||
// Signal that the poll has completed.
|
||||
pollCompleted.resolve();
|
||||
return new HttpResponse(204);
|
||||
|
||||
return new HttpResponse(200);
|
||||
}
|
||||
})
|
||||
.on("DELETE", (r) => new HttpResponse(202));
|
||||
const transport = new LongPollingTransport(client, null, NullLogger.instance, false, 100);
|
||||
const transport = new LongPollingTransport(client, null, NullLogger.instance, false);
|
||||
|
||||
await transport.connect("http://example.com", TransferFormat.Text);
|
||||
await transport.stop();
|
||||
const stopPromise = transport.stop();
|
||||
|
||||
// This should complete within the shutdown timeout
|
||||
await pollCompleted.promise;
|
||||
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("sends DELETE request on stop", async () => {
|
||||
it("204 server response stops polling and raises onClose", async () => {
|
||||
let firstPoll = true;
|
||||
const deleteReceived = new PromiseSource();
|
||||
const pollCompleted = new PromiseSource();
|
||||
let onCloseCalled = false;
|
||||
const client = new TestHttpClient()
|
||||
.on("GET", async (r) => {
|
||||
if (firstPoll) {
|
||||
firstPoll = false;
|
||||
return new HttpResponse(200);
|
||||
} else {
|
||||
await deleteReceived.promise;
|
||||
// Force the shutdown timer to be registered by not returning inline
|
||||
await delay(10);
|
||||
pollCompleted.resolve();
|
||||
// A 204 response will stop the long polling transport
|
||||
return new HttpResponse(204);
|
||||
}
|
||||
});
|
||||
const transport = new LongPollingTransport(client, null, NullLogger.instance, false);
|
||||
|
||||
const stopPromise = makeClosedPromise(transport);
|
||||
|
||||
await transport.connect("http://example.com", TransferFormat.Text);
|
||||
|
||||
// Close will be called on transport because of 204 result from polling
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("sends DELETE on stop after polling has finished", async () => {
|
||||
let firstPoll = true;
|
||||
let deleteSent = false;
|
||||
const pollingPromiseSource = new PromiseSource();
|
||||
const deleteSyncPoint = new SyncPoint();
|
||||
const httpClient = new TestHttpClient()
|
||||
.on("GET", async (r) => {
|
||||
if (firstPoll) {
|
||||
firstPoll = false;
|
||||
return new HttpResponse(200);
|
||||
} else {
|
||||
await pollingPromiseSource.promise;
|
||||
return new HttpResponse(204);
|
||||
}
|
||||
})
|
||||
.on("DELETE", (r) => {
|
||||
deleteReceived.resolve();
|
||||
.on("DELETE", async (r) => {
|
||||
deleteSent = true;
|
||||
await deleteSyncPoint.waitToContinue();
|
||||
return new HttpResponse(202);
|
||||
});
|
||||
const transport = new LongPollingTransport(client, null, NullLogger.instance, false, 100);
|
||||
|
||||
const transport = new LongPollingTransport(httpClient, null, NullLogger.instance, false);
|
||||
|
||||
await transport.connect("http://example.com", TransferFormat.Text);
|
||||
await transport.stop();
|
||||
await transport.connect("http://tempuri.org", TransferFormat.Text);
|
||||
|
||||
// This should complete, because the DELETE request triggers it to stop.
|
||||
await pollCompleted.promise;
|
||||
// Begin stopping transport
|
||||
const stopPromise = transport.stop();
|
||||
|
||||
// Delete will not be sent until polling is finished
|
||||
expect(deleteSent).toEqual(false);
|
||||
|
||||
// Allow polling to complete
|
||||
pollingPromiseSource.resolve();
|
||||
|
||||
// Wait for delete to be called
|
||||
await deleteSyncPoint.waitForSyncPoint();
|
||||
|
||||
expect(deleteSent).toEqual(true);
|
||||
|
||||
deleteSyncPoint.continue();
|
||||
|
||||
// Wait for stop to complete
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
for (const result of [200, 204, 300, new HttpError("Boom", 500), new TimeoutError()]) {
|
||||
|
||||
// Function has a name property but TypeScript doesn't know about it.
|
||||
const resultName = typeof result === "number" ? result.toString() : (result.constructor as any).name;
|
||||
|
||||
it(`does not fire shutdown timer when poll terminates with ${resultName}`, async () => {
|
||||
let firstPoll = true;
|
||||
const deleteReceived = new PromiseSource();
|
||||
const pollCompleted = new PromiseSource();
|
||||
const client = new TestHttpClient()
|
||||
.on("GET", async (r) => {
|
||||
if (firstPoll) {
|
||||
firstPoll = false;
|
||||
return new HttpResponse(200);
|
||||
} else {
|
||||
await deleteReceived.promise;
|
||||
// Force the shutdown timer to be registered by not returning inline
|
||||
await delay(10);
|
||||
pollCompleted.resolve();
|
||||
|
||||
if (typeof result === "number") {
|
||||
return new HttpResponse(result);
|
||||
} else {
|
||||
throw result;
|
||||
}
|
||||
}
|
||||
})
|
||||
.on("DELETE", (r) => {
|
||||
deleteReceived.resolve();
|
||||
return new HttpResponse(202);
|
||||
});
|
||||
const logMessages: string[] = [];
|
||||
const transport = new LongPollingTransport(client, null, {
|
||||
log(level: LogLevel, message: string) {
|
||||
logMessages.push(message);
|
||||
},
|
||||
}, false, 100);
|
||||
|
||||
await transport.connect("http://example.com", TransferFormat.Text);
|
||||
await transport.stop();
|
||||
|
||||
// This should complete, because the DELETE request triggers it to stop.
|
||||
await pollCompleted.promise;
|
||||
|
||||
// Wait for the shutdown timeout to elapse
|
||||
// This can be much cleaner when we port to Jest because it has a built-in set of
|
||||
// fake timers!
|
||||
await delay(150);
|
||||
|
||||
// The pollAbort token should be left unaborted because we shut down gracefully.
|
||||
expect(transport.pollAborted).toBe(false);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
function makeClosedPromise(transport: LongPollingTransport): Promise<void> {
|
||||
const closed = new PromiseSource();
|
||||
transport.onclose = (error) => {
|
||||
if (error) {
|
||||
closed.reject(error);
|
||||
} else {
|
||||
closed.resolve();
|
||||
}
|
||||
};
|
||||
return closed.promise;
|
||||
}
|
||||
|
|
@ -38,3 +38,26 @@ export class PromiseSource<T = void> implements Promise<T> {
|
|||
return this.promise.catch(onrejected);
|
||||
}
|
||||
}
|
||||
|
||||
export class SyncPoint {
|
||||
private atSyncPoint: PromiseSource;
|
||||
private continueFromSyncPoint: PromiseSource;
|
||||
|
||||
constructor() {
|
||||
this.atSyncPoint = new PromiseSource();
|
||||
this.continueFromSyncPoint = new PromiseSource();
|
||||
}
|
||||
|
||||
public waitForSyncPoint(): Promise<void> {
|
||||
return this.atSyncPoint.promise;
|
||||
}
|
||||
|
||||
public continue() {
|
||||
this.continueFromSyncPoint.resolve();
|
||||
}
|
||||
|
||||
public waitToContinue(): Promise<void> {
|
||||
this.atSyncPoint.resolve();
|
||||
return this.continueFromSyncPoint.promise;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue