parent
ae9c3cf04d
commit
f4bb309994
|
|
@ -320,7 +320,7 @@ describe("Connection", () => {
|
|||
// mode: TransferMode : TransferMode.Text
|
||||
connect(url: string, requestedTransferMode: TransferMode): Promise<TransferMode> { return Promise.resolve(transportTransferMode); },
|
||||
send(data: any): Promise<void> { return Promise.resolve(); },
|
||||
stop(): void {},
|
||||
stop(): void { },
|
||||
onreceive: null,
|
||||
onclose: null,
|
||||
mode: transportTransferMode
|
||||
|
|
|
|||
|
|
@ -10,7 +10,8 @@ import { TextMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/For
|
|||
import { ILogger, LogLevel } from "../Microsoft.AspNetCore.SignalR.Client.TS/ILogger"
|
||||
import { MessageType } from "../Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol"
|
||||
|
||||
import { asyncit as it, captureException } from './JasmineUtils';
|
||||
import { asyncit as it, captureException, delay, PromiseSource } from './Utils';
|
||||
import { IHubConnectionOptions } from "../Microsoft.AspNetCore.SignalR.Client.TS/IHubConnectionOptions";
|
||||
|
||||
describe("HubConnection", () => {
|
||||
|
||||
|
|
@ -437,7 +438,46 @@ describe("HubConnection", () => {
|
|||
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" });
|
||||
|
||||
expect(await invokePromise).toBe("foo");
|
||||
})
|
||||
});
|
||||
|
||||
it("does not terminate if messages are received", async () => {
|
||||
let connection = new TestConnection();
|
||||
let hubConnection = new HubConnection(connection, { serverTimeoutInMilliseconds: 100 });
|
||||
|
||||
let p = new PromiseSource<Error>();
|
||||
hubConnection.onclose(error => p.resolve(error));
|
||||
|
||||
await hubConnection.start();
|
||||
|
||||
await connection.receive({ type: MessageType.Ping });
|
||||
await delay(50);
|
||||
await connection.receive({ type: MessageType.Ping });
|
||||
await delay(50);
|
||||
await connection.receive({ type: MessageType.Ping });
|
||||
await delay(50);
|
||||
await connection.receive({ type: MessageType.Ping });
|
||||
await delay(50);
|
||||
|
||||
connection.stop();
|
||||
|
||||
let error = await p.promise;
|
||||
|
||||
expect(error).toBeUndefined();
|
||||
});
|
||||
|
||||
it("terminates if no messages received within timeout interval", async () => {
|
||||
let connection = new TestConnection();
|
||||
let hubConnection = new HubConnection(connection, { serverTimeoutInMilliseconds: 100 });
|
||||
|
||||
let p = new PromiseSource<Error>();
|
||||
hubConnection.onclose(error => p.resolve(error));
|
||||
|
||||
await hubConnection.start();
|
||||
|
||||
let error = await p.promise;
|
||||
|
||||
expect(error).toEqual(new Error("Server timeout elapsed without receiving a message from the server."));
|
||||
});
|
||||
})
|
||||
});
|
||||
|
||||
|
|
@ -463,9 +503,9 @@ class TestConnection implements IConnection {
|
|||
return Promise.resolve();
|
||||
};
|
||||
|
||||
stop(): void {
|
||||
stop(error?: Error): void {
|
||||
if (this.onclose) {
|
||||
this.onclose();
|
||||
this.onclose(error);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -505,26 +545,4 @@ class TestObserver implements Observer<any>
|
|||
complete() {
|
||||
this.itemsSource.resolve(this.itemsReceived);
|
||||
}
|
||||
};
|
||||
|
||||
class PromiseSource<T> {
|
||||
public promise: Promise<T>
|
||||
|
||||
private resolver: (value?: T | PromiseLike<T>) => void;
|
||||
private rejecter: (reason?: any) => void;
|
||||
|
||||
constructor() {
|
||||
this.promise = new Promise<T>((resolve, reject) => {
|
||||
this.resolver = resolve;
|
||||
this.rejecter = reject;
|
||||
});
|
||||
}
|
||||
|
||||
resolve(value?: T | PromiseLike<T>) {
|
||||
this.resolver(value);
|
||||
}
|
||||
|
||||
reject(reason?: any) {
|
||||
this.rejecter(reason);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -1,6 +1,8 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
import { clearTimeout, setTimeout } from "timers";
|
||||
|
||||
export function asyncit(expectation: string, assertion?: () => Promise<any>, timeout?: number): void {
|
||||
let testFunction: (done: DoneFn) => void;
|
||||
if (assertion) {
|
||||
|
|
@ -24,4 +26,32 @@ export async function captureException(fn: () => Promise<any>): Promise<Error> {
|
|||
} catch (e) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
||||
export function delay(durationInMilliseconds: number): Promise<void> {
|
||||
let source = new PromiseSource<void>();
|
||||
setTimeout(() => source.resolve(), durationInMilliseconds);
|
||||
return source.promise;
|
||||
}
|
||||
|
||||
export class PromiseSource<T> {
|
||||
public promise: Promise<T>
|
||||
|
||||
private resolver: (value?: T | PromiseLike<T>) => void;
|
||||
private rejecter: (reason?: any) => void;
|
||||
|
||||
constructor() {
|
||||
this.promise = new Promise<T>((resolve, reject) => {
|
||||
this.resolver = resolve;
|
||||
this.rejecter = reject;
|
||||
});
|
||||
}
|
||||
|
||||
resolve(value?: T | PromiseLike<T>) {
|
||||
this.resolver(value);
|
||||
}
|
||||
|
||||
reject(reason?: any) {
|
||||
this.rejecter(reason);
|
||||
}
|
||||
}
|
||||
|
|
@ -89,7 +89,7 @@ export class HttpConnection implements IConnection {
|
|||
? TransferMode.Binary
|
||||
: TransferMode.Text;
|
||||
|
||||
this.features.transferMode = await this.transport.connect(this.url, requestedTransferMode);
|
||||
this.features.transferMode = await this.transport.connect(this.url, requestedTransferMode, this);
|
||||
|
||||
// only change the state if we were connecting to not overwrite
|
||||
// the state if the connection is already marked as Disconnected
|
||||
|
|
@ -144,7 +144,7 @@ export class HttpConnection implements IConnection {
|
|||
return this.transport.send(data);
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
async stop(error? : Error): Promise<void> {
|
||||
let previousState = this.connectionState;
|
||||
this.connectionState = ConnectionState.Disconnected;
|
||||
|
||||
|
|
@ -154,10 +154,10 @@ export class HttpConnection implements IConnection {
|
|||
catch (e) {
|
||||
// this exception is returned to the user as a rejected Promise from the start method
|
||||
}
|
||||
this.stopConnection(/*raiseClosed*/ previousState == ConnectionState.Connected);
|
||||
this.stopConnection(/*raiseClosed*/ previousState == ConnectionState.Connected, error);
|
||||
}
|
||||
|
||||
private stopConnection(raiseClosed: Boolean, error?: any) {
|
||||
private stopConnection(raiseClosed: Boolean, error?: Error) {
|
||||
if (this.transport) {
|
||||
this.transport.stop();
|
||||
this.transport = null;
|
||||
|
|
@ -209,4 +209,4 @@ export class HttpConnection implements IConnection {
|
|||
|
||||
onreceive: DataReceived;
|
||||
onclose: ConnectionClosed;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import { Base64EncodedHubProtocol } from "./Base64EncodedHubProtocol"
|
|||
import { ILogger, LogLevel } from "./ILogger"
|
||||
import { ConsoleLogger, NullLogger, LoggerFactory } from "./Loggers"
|
||||
import { IHubConnectionOptions } from "./IHubConnectionOptions"
|
||||
import { setTimeout, clearTimeout } from "timers";
|
||||
|
||||
export { TransportType } from "./Transports"
|
||||
export { HttpConnection } from "./HttpConnection"
|
||||
|
|
@ -20,6 +21,8 @@ export { JsonHubProtocol } from "./JsonHubProtocol"
|
|||
export { LogLevel, ILogger } from "./ILogger"
|
||||
export { ConsoleLogger, NullLogger } from "./Loggers"
|
||||
|
||||
const DEFAULT_SERVER_TIMEOUT_IN_MS: number = 30 * 1000;
|
||||
|
||||
export class HubConnection {
|
||||
private readonly connection: IConnection;
|
||||
private readonly logger: ILogger;
|
||||
|
|
@ -28,9 +31,14 @@ export class HubConnection {
|
|||
private methods: Map<string, ((...args: any[]) => void)[]>;
|
||||
private id: number;
|
||||
private closedCallbacks: ConnectionClosed[];
|
||||
private timeoutHandle: NodeJS.Timer;
|
||||
private serverTimeoutInMilliseconds: number;
|
||||
|
||||
constructor(urlOrConnection: string | IConnection, options: IHubConnectionOptions = {}) {
|
||||
options = options || {};
|
||||
|
||||
this.serverTimeoutInMilliseconds = options.serverTimeoutInMilliseconds || DEFAULT_SERVER_TIMEOUT_IN_MS;
|
||||
|
||||
if (typeof urlOrConnection === "string") {
|
||||
this.connection = new HttpConnection(urlOrConnection, options);
|
||||
}
|
||||
|
|
@ -51,6 +59,10 @@ export class HubConnection {
|
|||
}
|
||||
|
||||
private processIncomingData(data: any) {
|
||||
if (this.timeoutHandle !== undefined) {
|
||||
clearTimeout(this.timeoutHandle);
|
||||
}
|
||||
|
||||
// Parse the messages
|
||||
let messages = this.protocol.parseMessages(data);
|
||||
|
||||
|
|
@ -79,6 +91,21 @@ export class HubConnection {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
this.configureTimeout();
|
||||
}
|
||||
|
||||
private configureTimeout() {
|
||||
if (!this.connection.features || !this.connection.features.inherentKeepAlive) {
|
||||
// Set the timeout timer
|
||||
this.timeoutHandle = setTimeout(() => this.serverTimeout(), this.serverTimeoutInMilliseconds);
|
||||
}
|
||||
}
|
||||
|
||||
private serverTimeout() {
|
||||
// The server hasn't talked to us in a while. It doesn't like us anymore ... :(
|
||||
// Terminate the connection
|
||||
this.connection.stop(new Error("Server timeout elapsed without receiving a message from the server."));
|
||||
}
|
||||
|
||||
private invokeClientMethod(invocationMessage: InvocationMessage) {
|
||||
|
|
@ -122,6 +149,8 @@ export class HubConnection {
|
|||
if (requestedTransferMode === TransferMode.Binary && actualTransferMode === TransferMode.Text) {
|
||||
this.protocol = new Base64EncodedHubProtocol(this.protocol);
|
||||
}
|
||||
|
||||
this.configureTimeout();
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ export interface IConnection {
|
|||
|
||||
start(): Promise<void>;
|
||||
send(data: any): Promise<void>;
|
||||
stop(): void;
|
||||
stop(error?: Error): void;
|
||||
|
||||
onreceive: DataReceived;
|
||||
onclose: ConnectionClosed;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,4 +7,5 @@ import { ILogger, LogLevel } from "./ILogger"
|
|||
|
||||
export interface IHubConnectionOptions extends IHttpConnectionOptions {
|
||||
protocol?: IHubProtocol;
|
||||
serverTimeoutInMilliseconds?: number;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import { DataReceived, TransportClosed } from "./Common"
|
|||
import { IHttpClient } from "./HttpClient"
|
||||
import { HttpError } from "./HttpError"
|
||||
import { ILogger, LogLevel } from "./ILogger"
|
||||
import { IConnection } from "./IConnection"
|
||||
|
||||
export enum TransportType {
|
||||
WebSockets,
|
||||
|
|
@ -18,7 +19,7 @@ export const enum TransferMode {
|
|||
}
|
||||
|
||||
export interface ITransport {
|
||||
connect(url: string, requestedTransferMode: TransferMode): Promise<TransferMode>;
|
||||
connect(url: string, requestedTransferMode: TransferMode, connection: IConnection): Promise<TransferMode>;
|
||||
send(data: any): Promise<void>;
|
||||
stop(): void;
|
||||
onreceive: DataReceived;
|
||||
|
|
@ -35,7 +36,7 @@ export class WebSocketTransport implements ITransport {
|
|||
this.jwtBearer = jwtBearer;
|
||||
}
|
||||
|
||||
connect(url: string, requestedTransferMode: TransferMode): Promise<TransferMode> {
|
||||
connect(url: string, requestedTransferMode: TransferMode, connection: IConnection): Promise<TransferMode> {
|
||||
|
||||
return new Promise<TransferMode>((resolve, reject) => {
|
||||
url = url.replace(/^http/, "ws");
|
||||
|
|
@ -113,7 +114,7 @@ export class ServerSentEventsTransport implements ITransport {
|
|||
this.logger = logger;
|
||||
}
|
||||
|
||||
connect(url: string, requestedTransferMode: TransferMode): Promise<TransferMode> {
|
||||
connect(url: string, requestedTransferMode: TransferMode, connection: IConnection): Promise<TransferMode> {
|
||||
if (typeof (EventSource) === "undefined") {
|
||||
Promise.reject("EventSource not supported by the browser.");
|
||||
}
|
||||
|
|
@ -194,10 +195,13 @@ export class LongPollingTransport implements ITransport {
|
|||
this.logger = logger;
|
||||
}
|
||||
|
||||
connect(url: string, requestedTransferMode: TransferMode): Promise<TransferMode> {
|
||||
connect(url: string, requestedTransferMode: TransferMode, connection: IConnection): Promise<TransferMode> {
|
||||
this.url = url;
|
||||
this.shouldPoll = true;
|
||||
|
||||
// Set a flag indicating we have inherent keep-alive in this transport.
|
||||
connection.features.inherentKeepAlive = true;
|
||||
|
||||
if (requestedTransferMode === TransferMode.Binary && (typeof new XMLHttpRequest().responseType !== "string")) {
|
||||
// This will work if we fix: https://github.com/aspnet/SignalR/issues/742
|
||||
throw new Error("Binary protocols over XmlHttpRequest not implementing advanced features are not supported.");
|
||||
|
|
@ -300,4 +304,4 @@ async function send(httpClient: IHttpClient, url: string, jwtBearer: () => strin
|
|||
}
|
||||
|
||||
await httpClient.post(url, data, headers);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ describe('hubConnection', function () {
|
|||
});
|
||||
|
||||
hubConnection.start().then(function () {
|
||||
hubConnection.send('SendCustomObject', { Name: 'test', Value: 42});
|
||||
hubConnection.send('SendCustomObject', { Name: 'test', Value: 42 });
|
||||
}).catch(function (e) {
|
||||
fail(e);
|
||||
done();
|
||||
|
|
@ -258,7 +258,7 @@ describe('hubConnection', function () {
|
|||
hubConnection.start().then(function () {
|
||||
return hubConnection.invoke('InvokeWithString', message);
|
||||
})
|
||||
.then(function() {
|
||||
.then(function () {
|
||||
return hubConnection.stop();
|
||||
})
|
||||
.catch(function (e) {
|
||||
|
|
@ -367,18 +367,42 @@ describe('hubConnection', function () {
|
|||
});
|
||||
return hubConnection.start();
|
||||
})
|
||||
.then(function() {
|
||||
.then(function () {
|
||||
return hubConnection.invoke('Echo', message);
|
||||
})
|
||||
.then(function(response) {
|
||||
.then(function (response) {
|
||||
expect(response).toEqual(message);
|
||||
return hubConnection.stop();
|
||||
})
|
||||
.catch(function(e) {
|
||||
.catch(function (e) {
|
||||
fail(e);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
if (transportType != signalR.TransportType.LongPolling) {
|
||||
it("terminates if no messages received within timeout interval", function (done) {
|
||||
var options = {
|
||||
transport: transportType,
|
||||
logging: signalR.LogLevel.Trace,
|
||||
serverTimeoutInMilliseconds: 100
|
||||
};
|
||||
|
||||
var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options);
|
||||
|
||||
var timeout = setTimeout(200, function () {
|
||||
fail("Server timeout did not fire within expected interval");
|
||||
});
|
||||
|
||||
hubConnection.start().then(function () {
|
||||
hubConnection.onclose(function (error) {
|
||||
clearTimeout(timeout);
|
||||
expect(error).toEqual(new Error("Server timeout elapsed without receiving a message from the server."));
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue