diff --git a/src/SignalR/clients/ts/FunctionalTests/TestHub.cs b/src/SignalR/clients/ts/FunctionalTests/TestHub.cs index 9c39d97d6e..8a00db96f1 100644 --- a/src/SignalR/clients/ts/FunctionalTests/TestHub.cs +++ b/src/SignalR/clients/ts/FunctionalTests/TestHub.cs @@ -3,6 +3,7 @@ using System; using System.Reactive.Linq; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.Http.Connections; @@ -20,6 +21,13 @@ namespace FunctionalTests public class TestHub : Hub { + private readonly IHubContext _context; + + public TestHub(IHubContext context) + { + _context = context; + } + public string Echo(string message) { return message; @@ -50,6 +58,19 @@ namespace FunctionalTests return channel.Reader; } + public ChannelReader InfiniteStream(CancellationToken token) + { + var channel = Channel.CreateUnbounded(); + var connectionId = Context.ConnectionId; + + token.Register(async (state) => + { + await ((IHubContext)state).Clients.Client(connectionId).SendAsync("StreamCanceled"); + }, _context); + + return channel.Reader; + } + public ChannelReader EmptyStream() { var channel = Channel.CreateUnbounded(); diff --git a/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts b/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts index b7665cb306..93461c7ede 100644 --- a/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts +++ b/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts @@ -171,6 +171,39 @@ describe("hubConnection", () => { }); }); + it("can stream server method and cancel stream", (done) => { + const hubConnection = getConnectionBuilder(transportType) + .withHubProtocol(protocol) + .build(); + + hubConnection.onclose((error) => { + expect(error).toBe(undefined); + done(); + }); + + hubConnection.on("StreamCanceled", () => { + hubConnection.stop(); + }); + + hubConnection.start().then(() => { + const subscription = hubConnection.stream("InfiniteStream").subscribe({ + complete() { + }, + error(err) { + fail(err); + hubConnection.stop(); + }, + next() { + }, + }); + + subscription.dispose(); + }).catch((e) => { + fail(e); + done(); + }); + }); + it("rethrows an exception from the server when invoking", (done) => { const errorMessage = "An unexpected error occurred invoking 'ThrowException' on the server. InvalidOperationException: An error occurred."; const hubConnection = getConnectionBuilder(transportType) diff --git a/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts b/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts index 7ddc48e2e4..252af5fc3c 100644 --- a/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts +++ b/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts @@ -4,7 +4,8 @@ import { Buffer } from "buffer"; import * as msgpack5 from "msgpack5"; -import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr"; +import { CancelInvocationMessage, CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, + LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr"; import { BinaryMessageFormat } from "./BinaryMessageFormat"; import { isArrayBuffer } from "./Utils"; @@ -70,6 +71,8 @@ export class MessagePackHubProtocol implements IHubProtocol { throw new Error(`Writing messages of type '${message.type}' is not supported.`); case MessageType.Ping: return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE); + case MessageType.CancelInvocation: + return this.writeCancelInvocation(message as CancelInvocationMessage); default: throw new Error("Invalid message type."); } @@ -226,6 +229,13 @@ export class MessagePackHubProtocol implements IHubProtocol { return BinaryMessageFormat.write(payload.slice()); } + private writeCancelInvocation(cancelInvocationMessage: CancelInvocationMessage): ArrayBuffer { + const msgpack = msgpack5(); + const payload = msgpack.encode([MessageType.CancelInvocation, cancelInvocationMessage.headers || {}, cancelInvocationMessage.invocationId]); + + return BinaryMessageFormat.write(payload.slice()); + } + private readHeaders(properties: any): MessageHeaders { const headers: MessageHeaders = properties[1] as MessageHeaders; if (typeof headers !== "object") { diff --git a/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts b/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts index e22dc33fe5..8c0923ef20 100644 --- a/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts +++ b/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts @@ -198,4 +198,19 @@ describe("MessagePackHubProtocol", () => { const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.Ping }); expect(new Uint8Array(buffer)).toEqual(payload); }); + + it("can write cancel message", () => { + const payload = new Uint8Array([ + 0x07, // length prefix + 0x93, // message array length = 1 (fixarray) + 0x05, // type = 5 = CancelInvocation (fixnum) + 0x80, // headers + 0xa3, // invocationID = string length 3 + 0x61, // a + 0x62, // b + 0x63, // c + ]); + const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.CancelInvocation, invocationId: "abc" }); + expect(new Uint8Array(buffer)).toEqual(payload); + }); }); diff --git a/src/SignalR/clients/ts/signalr/src/HubConnection.ts b/src/SignalR/clients/ts/signalr/src/HubConnection.ts index 2ec156a7da..7eeae1deee 100644 --- a/src/SignalR/clients/ts/signalr/src/HubConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/HubConnection.ts @@ -154,14 +154,18 @@ export class HubConnection { public stream(methodName: string, ...args: any[]): IStreamResult { const invocationDescriptor = this.createStreamInvocation(methodName, args); - const subject = new Subject(() => { + let promiseQueue: Promise; + const subject = new Subject(); + subject.cancelCallback = () => { const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId); const cancelMessage: any = this.protocol.writeMessage(cancelInvocation); delete this.callbacks[invocationDescriptor.invocationId]; - return this.sendMessage(cancelMessage); - }); + return promiseQueue.then(() => { + return this.sendMessage(cancelMessage); + }); + }; this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => { if (error) { @@ -183,7 +187,7 @@ export class HubConnection { const message = this.protocol.writeMessage(invocationDescriptor); - this.sendMessage(message) + promiseQueue = this.sendMessage(message) .catch((e) => { subject.error(e); delete this.callbacks[invocationDescriptor.invocationId]; diff --git a/src/SignalR/clients/ts/signalr/src/Utils.ts b/src/SignalR/clients/ts/signalr/src/Utils.ts index 6e0ae91862..b1fb779c15 100644 --- a/src/SignalR/clients/ts/signalr/src/Utils.ts +++ b/src/SignalR/clients/ts/signalr/src/Utils.ts @@ -107,11 +107,10 @@ export function createLogger(logger?: ILogger | LogLevel) { /** @private */ export class Subject implements IStreamResult { public observers: Array>; - public cancelCallback: () => Promise; + public cancelCallback?: () => Promise; - constructor(cancelCallback: () => Promise) { + constructor() { this.observers = []; - this.cancelCallback = cancelCallback; } public next(item: T): void { @@ -158,7 +157,7 @@ export class SubjectSubscription implements ISubscription { this.subject.observers.splice(index, 1); } - if (this.subject.observers.length === 0) { + if (this.subject.observers.length === 0 && this.subject.cancelCallback) { this.subject.cancelCallback().catch((_) => { }); } } diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts index 04b889b59d..dca9c504e0 100644 --- a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts @@ -12,7 +12,7 @@ import { IStreamSubscriber } from "../src/Stream"; import { TextMessageFormat } from "../src/TextMessageFormat"; import { VerifyLogger } from "./Common"; -import { delay, PromiseSource, registerUnhandledRejectionHandler } from "./Utils"; +import { delayUntil, PromiseSource, registerUnhandledRejectionHandler } from "./Utils"; function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) { return HubConnection.create(connection, logger || NullLogger.instance, protocol || new JsonHubProtocol()); @@ -65,7 +65,7 @@ describe("HubConnection", () => { try { await hubConnection.start(); - await delay(500); + await delayUntil(500); const numPings = connection.sentData.filter((s) => JSON.parse(s).type === MessageType.Ping).length; expect(numPings).toBeGreaterThanOrEqual(2); @@ -953,6 +953,8 @@ describe("HubConnection", () => { // Observer should no longer receive messages expect(observer.itemsReceived).toEqual([1]); + // Close message sent asynchronously so we need to wait + await delayUntil(1000, () => connection.sentData.length === 3); // Verify the cancel is sent (+ handshake) expect(connection.sentData.length).toBe(3); expect(JSON.parse(connection.sentData[2])).toEqual({ @@ -1061,14 +1063,14 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - hubConnection.serverTimeoutInMilliseconds = 200; + hubConnection.serverTimeoutInMilliseconds = 400; const p = new PromiseSource(); hubConnection.onclose((e) => p.resolve(e)); await hubConnection.start(); - for (let i = 0; i < 6; i++) { + for (let i = 0; i < 12; i++) { await pingAndWait(connection); } @@ -1108,7 +1110,7 @@ describe("HubConnection", () => { async function pingAndWait(connection: TestConnection): Promise { await connection.receive({ type: MessageType.Ping }); - await delay(50); + await delayUntil(50); } class TestConnection implements IConnection { diff --git a/src/SignalR/clients/ts/signalr/tests/Utils.ts b/src/SignalR/clients/ts/signalr/tests/Utils.ts index a60dfa269b..a5bc9829bb 100644 --- a/src/SignalR/clients/ts/signalr/tests/Utils.ts +++ b/src/SignalR/clients/ts/signalr/tests/Utils.ts @@ -13,9 +13,24 @@ export function registerUnhandledRejectionHandler(): void { }); } -export function delay(durationInMilliseconds: number): Promise { +export function delayUntil(timeoutInMilliseconds: number, condition?: () => boolean): Promise { const source = new PromiseSource(); - setTimeout(() => source.resolve(), durationInMilliseconds); + let timeWait: number = 0; + const interval = setInterval(() => { + timeWait += 10; + if (condition) { + if (condition() === true) { + source.resolve(); + clearInterval(interval); + } else if (timeoutInMilliseconds <= timeWait) { + source.reject(new Error("Timed out waiting for condition")); + clearInterval(interval); + } + } else if (timeoutInMilliseconds <= timeWait) { + source.resolve(); + clearInterval(interval); + } + }, 10); return source.promise; }