From 6bddb258cd5354c8358dd512f3403a335b460baf Mon Sep 17 00:00:00 2001 From: Pawel Kadluczka Date: Tue, 17 Oct 2017 13:12:09 -0700 Subject: [PATCH] Adding StreamCompletion message (TS) --- .../HubConnection.spec.ts | 64 ++++++----- .../HubConnection.ts | 61 ++++++----- .../IHubProtocol.ts | 7 +- .../MessagePackHubProtocol.ts | 18 +++- .../TestHub.cs | 10 ++ .../wwwroot/js/hubConnectionTests.js | 101 +++++++++++++++--- 6 files changed, 193 insertions(+), 68 deletions(-) diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts index 32f7deda38..184e455832 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts @@ -8,6 +8,7 @@ import { TransportType, ITransport, TransferMode } from "../Microsoft.AspNetCore import { Observer } from "../Microsoft.AspNetCore.SignalR.Client.TS/Observable" import { TextMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters" import { ILogger, LogLevel } from "../Microsoft.AspNetCore.SignalR.Client.TS/ILogger" +import { MessageType } from "../Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol" import { asyncit as it, captureException } from './JasmineUtils'; @@ -37,7 +38,7 @@ describe("HubConnection", () => { // Verify the message is sent expect(connection.sentData.length).toBe(1); expect(JSON.parse(connection.sentData[0])).toEqual({ - type: 1, + type: MessageType.Invocation, invocationId: connection.lastInvocationId, target: "testMethod", nonblocking: true, @@ -63,7 +64,7 @@ describe("HubConnection", () => { // Verify the message is sent expect(connection.sentData.length).toBe(1); expect(JSON.parse(connection.sentData[0])).toEqual({ - type: 1, + type: MessageType.Invocation, invocationId: connection.lastInvocationId, target: "testMethod", nonblocking: false, @@ -83,7 +84,7 @@ describe("HubConnection", () => { let hubConnection = new HubConnection(connection); let invokePromise = hubConnection.invoke("testMethod", "arg", 42); - connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" }); + connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, error: "foo" }); let ex = await captureException(async () => invokePromise); expect(ex.message).toBe("foo"); @@ -95,7 +96,7 @@ describe("HubConnection", () => { let hubConnection = new HubConnection(connection); let invokePromise = hubConnection.invoke("testMethod", "arg", 42); - connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" }); + connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" }); expect(await invokePromise).toBe("foo"); }); @@ -108,7 +109,7 @@ describe("HubConnection", () => { hubConnection.stop(); let ex = await captureException(async () => await invokePromise); - expect(ex.message).toBe("Invocation cancelled due to connection being closed."); + expect(ex.message).toBe("Invocation canceled due to connection being closed."); }); it("completes pending invocations when connection is lost", async () => { @@ -123,17 +124,30 @@ describe("HubConnection", () => { expect(ex.message).toBe("Connection lost"); }); - it("rejects streaming responses made using 'invoke'", async () => { + it("rejects streaming results made using 'invoke'", async () => { let connection = new TestConnection(); let hubConnection = new HubConnection(connection); let invokePromise = hubConnection.invoke("testMethod"); - connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: null }); + connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: null }); connection.onclose(); let ex = await captureException(async () => await invokePromise); - expect(ex.message).toBe("Streaming methods must be invoked using HubConnection.stream"); + expect(ex.message).toBe("Streaming methods must be invoked using the 'HubConnection.stream()' method."); + }); + + it("rejects streaming completions made using 'invoke'", async () => { + let connection = new TestConnection(); + + let hubConnection = new HubConnection(connection); + let invokePromise = hubConnection.invoke("testMethod"); + + connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId }); + connection.onclose(); + + let ex = await captureException(async () => await invokePromise); + expect(ex.message).toBe("Streaming methods must be invoked using the 'HubConnection.stream()' method."); }); }); @@ -151,7 +165,7 @@ describe("HubConnection", () => { let hubConnection = new HubConnection(connection, { logging: logger }); connection.receive({ - type: 1, + type: MessageType.Invocation, invocationId: 0, target: "message", arguments: ["test"], @@ -168,7 +182,7 @@ describe("HubConnection", () => { hubConnection.on("message", v => value = v); connection.receive({ - type: 1, + type: MessageType.Invocation, invocationId: 0, target: "message", arguments: ["test"], @@ -187,7 +201,7 @@ describe("HubConnection", () => { hubConnection.on("message", () => numInvocations2++); connection.receive({ - type: 1, + type: MessageType.Invocation, invocationId: 0, target: "message", arguments: [], @@ -207,7 +221,7 @@ describe("HubConnection", () => { hubConnection.on("message", callback); connection.receive({ - type: 1, + type: MessageType.Invocation, invocationId: 0, target: "message", arguments: [], @@ -217,7 +231,7 @@ describe("HubConnection", () => { hubConnection.off("message", callback); connection.receive({ - type: 1, + type: MessageType.Invocation, invocationId: 0, target: "message", arguments: [], @@ -259,7 +273,7 @@ describe("HubConnection", () => { // invoke a method to make sure we are not trying to use null/undefined connection.receive({ - type: 1, + type: MessageType.Invocation, invocationId: 0, target: "message", arguments: [], @@ -287,7 +301,7 @@ describe("HubConnection", () => { // Verify the message is sent expect(connection.sentData.length).toBe(1); expect(JSON.parse(connection.sentData[0])).toEqual({ - type: 1, + type: MessageType.Invocation, invocationId: connection.lastInvocationId, target: "testStream", nonblocking: false, @@ -309,7 +323,7 @@ describe("HubConnection", () => { hubConnection.stream("testMethod", "arg", 42) .subscribe(observer); - connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" }); + connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId, error: "foo" }); let ex = await captureException(async () => await observer.completed); expect(ex.message).toEqual("Error: foo"); @@ -323,7 +337,7 @@ describe("HubConnection", () => { hubConnection.stream("testMethod", "arg", 42) .subscribe(observer); - connection.receive({ type: 3, invocationId: connection.lastInvocationId }); + connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId }); expect(await observer.completed).toEqual([]); }); @@ -338,7 +352,7 @@ describe("HubConnection", () => { hubConnection.stop(); let ex = await captureException(async () => await observer.completed); - expect(ex.message).toEqual("Error: Invocation cancelled due to connection being closed."); + expect(ex.message).toEqual("Error: Invocation canceled due to connection being closed."); }); it("completes pending streams when connection is lost", async () => { @@ -364,10 +378,10 @@ describe("HubConnection", () => { hubConnection.stream("testMethod") .subscribe(observer); - connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" }); + connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" }); let ex = await captureException(async () => await observer.completed); - expect(ex.message).toEqual("Error: Server provided a result in a completion response to a streamed invocation."); + expect(ex.message).toEqual("Error: Hub methods must be invoked using the 'HubConnection.invoke()' method."); }); it("yields items as they arrive", async () => { @@ -378,16 +392,16 @@ describe("HubConnection", () => { hubConnection.stream("testMethod") .subscribe(observer); - connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 1 }); + connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 1 }); expect(observer.itemsReceived).toEqual([1]); - connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 2 }); + connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 2 }); expect(observer.itemsReceived).toEqual([1, 2]); - connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 3 }); + connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 3 }); expect(observer.itemsReceived).toEqual([1, 2, 3]); - connection.receive({ type: 3, invocationId: connection.lastInvocationId }); + connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId }); expect(await observer.completed).toEqual([1, 2, 3]); }); @@ -414,7 +428,7 @@ describe("HubConnection", () => { // Send completion to trigger observer.complete() // Expectation is connection.receive will not to throw - connection.receive({ type: 3, invocationId: connection.lastInvocationId }); + connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId }); }); }); diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts index a59571ef37..ae87bda2dd 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts @@ -6,7 +6,7 @@ import { IConnection } from "./IConnection" import { HttpConnection} from "./HttpConnection" import { TransportType, TransferMode } from "./Transports" import { Subject, Observable } from "./Observable" -import { IHubProtocol, ProtocolType, MessageType, HubMessage, CompletionMessage, ResultMessage, InvocationMessage, NegotiationMessage } from "./IHubProtocol"; +import { IHubProtocol, ProtocolType, MessageType, HubMessage, CompletionMessage, StreamCompletionMessage, ResultMessage, InvocationMessage, NegotiationMessage } from "./IHubProtocol"; import { JsonHubProtocol } from "./JsonHubProtocol"; import { TextMessageFormat } from "./Formatters" import { Base64EncodedHubProtocol } from "./Base64EncodedHubProtocol" @@ -24,7 +24,7 @@ export class HubConnection { private readonly connection: IConnection; private readonly logger: ILogger; private protocol: IHubProtocol; - private callbacks: Map void>; + private callbacks: Map void>; private methods: Map void)[]>; private id: number; private closedCallbacks: ConnectionClosed[]; @@ -44,7 +44,7 @@ export class HubConnection { this.connection.onreceive = (data: any) => this.processIncomingData(data); this.connection.onclose = (error?: Error) => this.connectionClosed(error); - this.callbacks = new Map void>(); + this.callbacks = new Map void>(); this.methods = new Map void)[]>(); this.closedCallbacks = []; this.id = 0; @@ -63,9 +63,10 @@ export class HubConnection { break; case MessageType.Result: case MessageType.Completion: + case MessageType.StreamCompletion: let callback = this.callbacks.get(message.invocationId); if (callback != null) { - if (message.type == MessageType.Completion) { + if (message.type == MessageType.Completion || message.type == MessageType.StreamCompletion) { this.callbacks.delete(message.invocationId); } callback(message); @@ -92,14 +93,8 @@ export class HubConnection { } private connectionClosed(error?: Error) { - let errorCompletionMessage = { - type: MessageType.Completion, - invocationId: "-1", - error: error ? error.message : "Invocation cancelled due to connection being closed.", - }; - this.callbacks.forEach(callback => { - callback(errorCompletionMessage); + callback(undefined, error ? error : new Error("Invocation canceled due to connection being closed.")); }); this.callbacks.clear(); @@ -136,22 +131,28 @@ export class HubConnection { let subject = new Subject(); - this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => { - if (invocationEvent.type === MessageType.Completion) { - let completionMessage = invocationEvent; - if (completionMessage.error) { - subject.error(new Error(completionMessage.error)); - } - else if (completionMessage.result) { - subject.error(new Error("Server provided a result in a completion response to a streamed invocation.")); - } - else { - // TODO: Log a warning if there's a payload? - subject.complete(); - } + this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: HubMessage, error?: Error) => { + if (error) { + subject.error(error); + return; } - else { - subject.next((invocationEvent).item); + + switch (invocationEvent.type) { + case MessageType.StreamCompletion: + let completionMessage = invocationEvent; + if (completionMessage.error) { + subject.error(new Error(completionMessage.error)); + } + else { + subject.complete(); + } + break; + case MessageType.Result: + subject.next((invocationEvent).item); + break; + default: + subject.error(new Error("Hub methods must be invoked using the 'HubConnection.invoke()' method.")); + break; } }); @@ -178,7 +179,11 @@ export class HubConnection { let invocationDescriptor = this.createInvocation(methodName, args, false); let p = new Promise((resolve, reject) => { - this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => { + this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: HubMessage, error?: Error) => { + if (error) { + reject(error); + return; + } if (invocationEvent.type === MessageType.Completion) { let completionMessage = invocationEvent; if (completionMessage.error) { @@ -189,7 +194,7 @@ export class HubConnection { } } else { - reject(new Error("Streaming methods must be invoked using HubConnection.stream")) + reject(new Error("Streaming methods must be invoked using the 'HubConnection.stream()' method.")); } }); diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol.ts index d8a1da9668..63961ef8a2 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol.ts @@ -4,7 +4,8 @@ export const enum MessageType { Invocation = 1, Result, - Completion + Completion, + StreamCompletion } export interface HubMessage { @@ -22,6 +23,10 @@ export interface ResultMessage extends HubMessage { readonly item?: any; } +export interface StreamCompletionMessage extends HubMessage { + readonly error?: string; +} + export interface CompletionMessage extends HubMessage { readonly error?: string; readonly result?: any; diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol.ts index 05d9f1ba7e..fe51ff696d 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol.ts @@ -1,7 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -import { IHubProtocol, ProtocolType, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage } from "./IHubProtocol"; +import { IHubProtocol, ProtocolType, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage, StreamCompletionMessage } from "./IHubProtocol"; import { BinaryMessageFormat } from "./Formatters" import * as msgpack5 from "msgpack5" @@ -34,6 +34,8 @@ export class MessagePackHubProtocol implements IHubProtocol { return this.createStreamItemMessage(properties); case MessageType.Completion: return this.createCompletionMessage(properties); + case MessageType.StreamCompletion: + return this.createStreamCompletionMessage(properties); default: throw new Error("Invalid message type."); } @@ -97,7 +99,19 @@ export class MessagePackHubProtocol implements IHubProtocol { break; } - return completionMessage as ResultMessage; + return completionMessage as CompletionMessage; + } + + private createStreamCompletionMessage(properties: any[]): StreamCompletionMessage { + if (properties.length < 2) { + throw new Error("Invalid payload for Completion message."); + } + + return { + type: MessageType.StreamCompletion, + invocationId: properties[1], + error: properties.length == 3 ? properties[2] : null, + }; } writeMessage(message: HubMessage): ArrayBuffer { diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/TestHub.cs b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/TestHub.cs index 530675da51..44ed712c93 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/TestHub.cs +++ b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/TestHub.cs @@ -42,6 +42,16 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server return new string[] { "a", "b", "c" }.ToObservable(); } + public IObservable EmptyStream() + { + return Array.Empty().ToObservable(); + } + + public IObservable StreamThrowException(string message) + { + throw new InvalidOperationException(message); + } + public ComplexObject EchoComplexObject(ComplexObject complexObject) { return complexObject; diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/wwwroot/js/hubConnectionTests.js b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/wwwroot/js/hubConnectionTests.js index 71404e8e49..6f4590bd42 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/wwwroot/js/hubConnectionTests.js +++ b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/wwwroot/js/hubConnectionTests.js @@ -10,7 +10,7 @@ describe('hubConnection', function () { describe(protocol.name + ' over ' + signalR.TransportType[transportType] + ' transport', function () { it('can invoke server method and receive result', function (done) { - var message = "你好,世界!"; + var message = '你好,世界!'; var options = { transport: transportType, @@ -57,7 +57,7 @@ describe('hubConnection', function () { }); hubConnection.start().then(function () { - hubConnection.send('SendCustomObject', { Name: "test", Value: 42}); + hubConnection.send('SendCustomObject', { Name: 'test', Value: 42}); }).catch(function (e) { fail(e); done(); @@ -89,7 +89,7 @@ describe('hubConnection', function () { hubConnection.stop(); }, complete: function complete() { - expect(received).toEqual(["a", "b", "c"]); + expect(received).toEqual(['a', 'b', 'c']); hubConnection.stop(); } }); @@ -100,7 +100,7 @@ describe('hubConnection', function () { }); it('rethrows an exception from the server when invoking', function (done) { - var errorMessage = "An error occurred."; + var errorMessage = 'An error occurred.'; var options = { transport: transportType, protocol: protocol, @@ -125,8 +125,7 @@ describe('hubConnection', function () { }); }); - it('rethrows an exception from the server when streaming', function (done) { - var errorMessage = "An error occurred."; + it('throws an exception when invoking streaming method with invoke', function (done) { var options = { transport: transportType, protocol: protocol, @@ -135,12 +134,90 @@ describe('hubConnection', function () { var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options); hubConnection.start().then(function () { - hubConnection.stream('ThrowException', errorMessage).subscribe({ + hubConnection.invoke('EmptyStream').then(function () { + // exception expected but none thrown + fail(); + }).catch(function (e) { + expect(e.message).toBe('Streaming methods must be invoked using the \'HubConnection.stream()\' method.'); + }).then(function () { + return hubConnection.stop(); + }).then(function () { + done(); + }); + }).catch(function (e) { + fail(e); + done(); + }); + }); + + it('throws an exception when receiving a streaming result for method called with invoke', function (done) { + var options = { + transport: transportType, + protocol: protocol, + logging: signalR.LogLevel.Trace + }; + var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options); + + hubConnection.start().then(function () { + hubConnection.invoke('Stream').then(function () { + // exception expected but none thrown + fail(); + }).catch(function (e) { + expect(e.message).toBe('Streaming methods must be invoked using the \'HubConnection.stream()\' method.'); + }).then(function () { + return hubConnection.stop(); + }).then(function () { + done(); + }); + }).catch(function (e) { + fail(e); + done(); + }); + }); + + it('rethrows an exception from the server when streaming', function (done) { + var errorMessage = 'An error occurred.'; + var options = { + transport: transportType, + protocol: protocol, + logging: signalR.LogLevel.Trace + }; + var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options); + + hubConnection.start().then(function () { + hubConnection.stream('StreamThrowException', errorMessage).subscribe({ next: function next(item) { fail(); }, error: function error(err) { - expect(err.message).toEqual("An error occurred."); + expect(err.message).toEqual('An error occurred.'); + done(); + }, + complete: function complete() { + fail(); + } + }); + }).catch(function (e) { + fail(e); + done(); + }); + }); + + it('throws an exception when invoking hub method with stream', function (done) { + var options = { + transport: transportType, + protocol: protocol, + logging: signalR.LogLevel.Trace + }; + var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options); + + hubConnection.start().then(function () { + hubConnection.stream('Echo', '42').subscribe({ + next: function next(item) { + fail(); + }, + error: function error(err) { + expect(err.message).toEqual('Hub methods must be invoked using the \'HubConnection.invoke()\' method.'); done(); }, complete: function complete() { @@ -161,7 +238,7 @@ describe('hubConnection', function () { }; var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options); - var message = "你好 SignalR!"; + var message = '你好 SignalR!'; // client side method names are case insensitive var methodName = 'message'; @@ -187,9 +264,9 @@ describe('hubConnection', function () { it('closed with error if hub cannot be created', function (done) { var errorRegex = { - WebSockets: "1011|1005", // Message is browser specific (e.g. 'Websocket closed with status code: 1011'), Edge and IE report 1005 even though the server sent 1011 - LongPolling: "Internal Server Error", - ServerSentEvents: "Error occurred" + WebSockets: '1011|1005', // Message is browser specific (e.g. 'Websocket closed with status code: 1011'), Edge and IE report 1005 even though the server sent 1011 + LongPolling: 'Internal Server Error', + ServerSentEvents: 'Error occurred' }; var options = {