From 01c5add273458bcd81f53aeeaba8601d8f78bbb8 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 8 Feb 2019 14:31:19 -0800 Subject: [PATCH] Add CancelInvocation support to MsgPack in TS client (#7224) --- .../clients/ts/FunctionalTests/TestHub.cs | 21 ++++++++++++ .../FunctionalTests/ts/HubConnectionTests.ts | 33 +++++++++++++++++++ .../src/MessagePackHubProtocol.ts | 12 ++++++- .../tests/MessagePackHubProtocol.test.ts | 15 +++++++++ .../clients/ts/signalr/src/HubConnection.ts | 7 ++-- .../ts/signalr/tests/HubConnection.test.ts | 12 ++++--- src/SignalR/clients/ts/signalr/tests/Utils.ts | 19 +++++++++-- 7 files changed, 109 insertions(+), 10 deletions(-) diff --git a/src/SignalR/clients/ts/FunctionalTests/TestHub.cs b/src/SignalR/clients/ts/FunctionalTests/TestHub.cs index e6bea7e82b..fce457491e 100644 --- a/src/SignalR/clients/ts/FunctionalTests/TestHub.cs +++ b/src/SignalR/clients/ts/FunctionalTests/TestHub.cs @@ -4,6 +4,7 @@ using System; using System.Reactive.Linq; using System.Text; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.Http.Connections; @@ -21,6 +22,13 @@ namespace FunctionalTests public class TestHub : Hub { + private readonly IHubContext _context; + + public TestHub(IHubContext context) + { + _context = context; + } + public string Echo(string message) { return message; @@ -51,6 +59,19 @@ namespace FunctionalTests return channel.Reader; } + public ChannelReader InfiniteStream(CancellationToken token) + { + var channel = Channel.CreateUnbounded(); + var connectionId = Context.ConnectionId; + + token.Register(async (state) => + { + await ((IHubContext)state).Clients.Client(connectionId).SendAsync("StreamCanceled"); + }, _context); + + return channel.Reader; + } + public async Task StreamingConcat(ChannelReader stream) { var sb = new StringBuilder(); diff --git a/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts b/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts index e212ad9a7f..8106adcf85 100644 --- a/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts +++ b/src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts @@ -184,6 +184,39 @@ describe("hubConnection", () => { }); }); + it("can stream server method and cancel stream", (done) => { + const hubConnection = getConnectionBuilder(transportType) + .withHubProtocol(protocol) + .build(); + + hubConnection.onclose((error) => { + expect(error).toBe(undefined); + done(); + }); + + hubConnection.on("StreamCanceled", () => { + hubConnection.stop(); + }); + + hubConnection.start().then(() => { + const subscription = hubConnection.stream("InfiniteStream").subscribe({ + complete() { + }, + error(err) { + fail(err); + hubConnection.stop(); + }, + next() { + }, + }); + + subscription.dispose(); + }).catch((e) => { + fail(e); + done(); + }); + }); + it("rethrows an exception from the server when invoking", (done) => { const errorMessage = "An unexpected error occurred invoking 'ThrowException' on the server. InvalidOperationException: An error occurred."; const hubConnection = getConnectionBuilder(transportType) diff --git a/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts b/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts index c6934fb77b..2a6f154326 100644 --- a/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts +++ b/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts @@ -4,7 +4,8 @@ import { Buffer } from "buffer"; import * as msgpack5 from "msgpack5"; -import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr"; +import { CancelInvocationMessage, CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, + LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr"; import { BinaryMessageFormat } from "./BinaryMessageFormat"; import { isArrayBuffer } from "./Utils"; @@ -75,6 +76,8 @@ export class MessagePackHubProtocol implements IHubProtocol { return this.writeCompletion(message as CompletionMessage); case MessageType.Ping: return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE); + case MessageType.CancelInvocation: + return this.writeCancelInvocation(message as CancelInvocationMessage); default: throw new Error("Invalid message type."); } @@ -257,6 +260,13 @@ export class MessagePackHubProtocol implements IHubProtocol { return BinaryMessageFormat.write(payload.slice()); } + private writeCancelInvocation(cancelInvocationMessage: CancelInvocationMessage): ArrayBuffer { + const msgpack = msgpack5(); + const payload = msgpack.encode([MessageType.CancelInvocation, cancelInvocationMessage.headers || {}, cancelInvocationMessage.invocationId]); + + return BinaryMessageFormat.write(payload.slice()); + } + private readHeaders(properties: any): MessageHeaders { const headers: MessageHeaders = properties[1] as MessageHeaders; if (typeof headers !== "object") { diff --git a/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts b/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts index e406614fa0..69b2ba6437 100644 --- a/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts +++ b/src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts @@ -202,4 +202,19 @@ describe("MessagePackHubProtocol", () => { const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.Ping }); expect(new Uint8Array(buffer)).toEqual(payload); }); + + it("can write cancel message", () => { + const payload = new Uint8Array([ + 0x07, // length prefix + 0x93, // message array length = 1 (fixarray) + 0x05, // type = 5 = CancelInvocation (fixnum) + 0x80, // headers + 0xa3, // invocationID = string length 3 + 0x61, // a + 0x62, // b + 0x63, // c + ]); + const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.CancelInvocation, invocationId: "abc" }); + expect(new Uint8Array(buffer)).toEqual(payload); + }); }); diff --git a/src/SignalR/clients/ts/signalr/src/HubConnection.ts b/src/SignalR/clients/ts/signalr/src/HubConnection.ts index 9ad3e7032b..1c6847733a 100644 --- a/src/SignalR/clients/ts/signalr/src/HubConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/HubConnection.ts @@ -156,13 +156,16 @@ export class HubConnection { const [streams, streamIds] = this.replaceStreamingParams(args); const invocationDescriptor = this.createStreamInvocation(methodName, args, streamIds); + let promiseQueue: Promise; const subject = new Subject(); subject.cancelCallback = () => { const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId); delete this.callbacks[invocationDescriptor.invocationId]; - return this.sendWithProtocol(cancelInvocation); + return promiseQueue.then(() => { + return this.sendWithProtocol(cancelInvocation); + }); }; this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => { @@ -183,7 +186,7 @@ export class HubConnection { } }; - const promiseQueue = this.sendWithProtocol(invocationDescriptor) + promiseQueue = this.sendWithProtocol(invocationDescriptor) .catch((e) => { subject.error(e); delete this.callbacks[invocationDescriptor.invocationId]; diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts index c70d51baad..4f5d24ee9c 100644 --- a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts @@ -13,7 +13,7 @@ import { Subject } from "../src/Subject"; import { TextMessageFormat } from "../src/TextMessageFormat"; import { VerifyLogger } from "./Common"; -import { delay, PromiseSource, registerUnhandledRejectionHandler } from "./Utils"; +import { delayUntil, PromiseSource, registerUnhandledRejectionHandler } from "./Utils"; function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) { return HubConnection.create(connection, logger || NullLogger.instance, protocol || new JsonHubProtocol()); @@ -66,7 +66,7 @@ describe("HubConnection", () => { try { await hubConnection.start(); - await delay(500); + await delayUntil(500); const numPings = connection.sentData.filter((s) => JSON.parse(s).type === MessageType.Ping).length; expect(numPings).toBeGreaterThanOrEqual(2); @@ -1075,6 +1075,8 @@ describe("HubConnection", () => { // Observer should no longer receive messages expect(observer.itemsReceived).toEqual([1]); + // Close message sent asynchronously so we need to wait + await delayUntil(1000, () => connection.sentData.length === 3); // Verify the cancel is sent (+ handshake) expect(connection.sentData.length).toBe(3); expect(JSON.parse(connection.sentData[2])).toEqual({ @@ -1183,14 +1185,14 @@ describe("HubConnection", () => { const connection = new TestConnection(); const hubConnection = createHubConnection(connection, logger); try { - hubConnection.serverTimeoutInMilliseconds = 200; + hubConnection.serverTimeoutInMilliseconds = 400; const p = new PromiseSource(); hubConnection.onclose((e) => p.resolve(e)); await hubConnection.start(); - for (let i = 0; i < 6; i++) { + for (let i = 0; i < 12; i++) { await pingAndWait(connection); } @@ -1230,7 +1232,7 @@ describe("HubConnection", () => { async function pingAndWait(connection: TestConnection): Promise { await connection.receive({ type: MessageType.Ping }); - await delay(50); + await delayUntil(50); } class TestConnection implements IConnection { diff --git a/src/SignalR/clients/ts/signalr/tests/Utils.ts b/src/SignalR/clients/ts/signalr/tests/Utils.ts index a60dfa269b..a5bc9829bb 100644 --- a/src/SignalR/clients/ts/signalr/tests/Utils.ts +++ b/src/SignalR/clients/ts/signalr/tests/Utils.ts @@ -13,9 +13,24 @@ export function registerUnhandledRejectionHandler(): void { }); } -export function delay(durationInMilliseconds: number): Promise { +export function delayUntil(timeoutInMilliseconds: number, condition?: () => boolean): Promise { const source = new PromiseSource(); - setTimeout(() => source.resolve(), durationInMilliseconds); + let timeWait: number = 0; + const interval = setInterval(() => { + timeWait += 10; + if (condition) { + if (condition() === true) { + source.resolve(); + clearInterval(interval); + } else if (timeoutInMilliseconds <= timeWait) { + source.reject(new Error("Timed out waiting for condition")); + clearInterval(interval); + } + } else if (timeoutInMilliseconds <= timeWait) { + source.resolve(); + clearInterval(interval); + } + }, 10); return source.promise; }