Adding StreamCompletion message (TS)

This commit is contained in:
Pawel Kadluczka 2017-10-17 13:12:09 -07:00 committed by Pawel Kadluczka
parent 8d1d6d0300
commit 6bddb258cd
6 changed files with 193 additions and 68 deletions

View File

@ -8,6 +8,7 @@ import { TransportType, ITransport, TransferMode } from "../Microsoft.AspNetCore
import { Observer } from "../Microsoft.AspNetCore.SignalR.Client.TS/Observable"
import { TextMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"
import { ILogger, LogLevel } from "../Microsoft.AspNetCore.SignalR.Client.TS/ILogger"
import { MessageType } from "../Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol"
import { asyncit as it, captureException } from './JasmineUtils';
@ -37,7 +38,7 @@ describe("HubConnection", () => {
// Verify the message is sent
expect(connection.sentData.length).toBe(1);
expect(JSON.parse(connection.sentData[0])).toEqual({
type: 1,
type: MessageType.Invocation,
invocationId: connection.lastInvocationId,
target: "testMethod",
nonblocking: true,
@ -63,7 +64,7 @@ describe("HubConnection", () => {
// Verify the message is sent
expect(connection.sentData.length).toBe(1);
expect(JSON.parse(connection.sentData[0])).toEqual({
type: 1,
type: MessageType.Invocation,
invocationId: connection.lastInvocationId,
target: "testMethod",
nonblocking: false,
@ -83,7 +84,7 @@ describe("HubConnection", () => {
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod", "arg", 42);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" });
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, error: "foo" });
let ex = await captureException(async () => invokePromise);
expect(ex.message).toBe("foo");
@ -95,7 +96,7 @@ describe("HubConnection", () => {
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod", "arg", 42);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" });
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" });
expect(await invokePromise).toBe("foo");
});
@ -108,7 +109,7 @@ describe("HubConnection", () => {
hubConnection.stop();
let ex = await captureException(async () => await invokePromise);
expect(ex.message).toBe("Invocation cancelled due to connection being closed.");
expect(ex.message).toBe("Invocation canceled due to connection being closed.");
});
it("completes pending invocations when connection is lost", async () => {
@ -123,17 +124,30 @@ describe("HubConnection", () => {
expect(ex.message).toBe("Connection lost");
});
it("rejects streaming responses made using 'invoke'", async () => {
it("rejects streaming results made using 'invoke'", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod");
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: null });
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: null });
connection.onclose();
let ex = await captureException(async () => await invokePromise);
expect(ex.message).toBe("Streaming methods must be invoked using HubConnection.stream");
expect(ex.message).toBe("Streaming methods must be invoked using the 'HubConnection.stream()' method.");
});
it("rejects streaming completions made using 'invoke'", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod");
connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId });
connection.onclose();
let ex = await captureException(async () => await invokePromise);
expect(ex.message).toBe("Streaming methods must be invoked using the 'HubConnection.stream()' method.");
});
});
@ -151,7 +165,7 @@ describe("HubConnection", () => {
let hubConnection = new HubConnection(connection, { logging: logger });
connection.receive({
type: 1,
type: MessageType.Invocation,
invocationId: 0,
target: "message",
arguments: ["test"],
@ -168,7 +182,7 @@ describe("HubConnection", () => {
hubConnection.on("message", v => value = v);
connection.receive({
type: 1,
type: MessageType.Invocation,
invocationId: 0,
target: "message",
arguments: ["test"],
@ -187,7 +201,7 @@ describe("HubConnection", () => {
hubConnection.on("message", () => numInvocations2++);
connection.receive({
type: 1,
type: MessageType.Invocation,
invocationId: 0,
target: "message",
arguments: [],
@ -207,7 +221,7 @@ describe("HubConnection", () => {
hubConnection.on("message", callback);
connection.receive({
type: 1,
type: MessageType.Invocation,
invocationId: 0,
target: "message",
arguments: [],
@ -217,7 +231,7 @@ describe("HubConnection", () => {
hubConnection.off("message", callback);
connection.receive({
type: 1,
type: MessageType.Invocation,
invocationId: 0,
target: "message",
arguments: [],
@ -259,7 +273,7 @@ describe("HubConnection", () => {
// invoke a method to make sure we are not trying to use null/undefined
connection.receive({
type: 1,
type: MessageType.Invocation,
invocationId: 0,
target: "message",
arguments: [],
@ -287,7 +301,7 @@ describe("HubConnection", () => {
// Verify the message is sent
expect(connection.sentData.length).toBe(1);
expect(JSON.parse(connection.sentData[0])).toEqual({
type: 1,
type: MessageType.Invocation,
invocationId: connection.lastInvocationId,
target: "testStream",
nonblocking: false,
@ -309,7 +323,7 @@ describe("HubConnection", () => {
hubConnection.stream<any>("testMethod", "arg", 42)
.subscribe(observer);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" });
connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId, error: "foo" });
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: foo");
@ -323,7 +337,7 @@ describe("HubConnection", () => {
hubConnection.stream<any>("testMethod", "arg", 42)
.subscribe(observer);
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId });
expect(await observer.completed).toEqual([]);
});
@ -338,7 +352,7 @@ describe("HubConnection", () => {
hubConnection.stop();
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: Invocation cancelled due to connection being closed.");
expect(ex.message).toEqual("Error: Invocation canceled due to connection being closed.");
});
it("completes pending streams when connection is lost", async () => {
@ -364,10 +378,10 @@ describe("HubConnection", () => {
hubConnection.stream<any>("testMethod")
.subscribe(observer);
connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" });
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" });
let ex = await captureException(async () => await observer.completed);
expect(ex.message).toEqual("Error: Server provided a result in a completion response to a streamed invocation.");
expect(ex.message).toEqual("Error: Hub methods must be invoked using the 'HubConnection.invoke()' method.");
});
it("yields items as they arrive", async () => {
@ -378,16 +392,16 @@ describe("HubConnection", () => {
hubConnection.stream<any>("testMethod")
.subscribe(observer);
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 1 });
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 1 });
expect(observer.itemsReceived).toEqual([1]);
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 2 });
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 2 });
expect(observer.itemsReceived).toEqual([1, 2]);
connection.receive({ type: 2, invocationId: connection.lastInvocationId, item: 3 });
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 3 });
expect(observer.itemsReceived).toEqual([1, 2, 3]);
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
connection.receive({ type: MessageType.StreamCompletion, invocationId: connection.lastInvocationId });
expect(await observer.completed).toEqual([1, 2, 3]);
});
@ -414,7 +428,7 @@ describe("HubConnection", () => {
// Send completion to trigger observer.complete()
// Expectation is connection.receive will not to throw
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId });
});
});

View File

@ -6,7 +6,7 @@ import { IConnection } from "./IConnection"
import { HttpConnection} from "./HttpConnection"
import { TransportType, TransferMode } from "./Transports"
import { Subject, Observable } from "./Observable"
import { IHubProtocol, ProtocolType, MessageType, HubMessage, CompletionMessage, ResultMessage, InvocationMessage, NegotiationMessage } from "./IHubProtocol";
import { IHubProtocol, ProtocolType, MessageType, HubMessage, CompletionMessage, StreamCompletionMessage, ResultMessage, InvocationMessage, NegotiationMessage } from "./IHubProtocol";
import { JsonHubProtocol } from "./JsonHubProtocol";
import { TextMessageFormat } from "./Formatters"
import { Base64EncodedHubProtocol } from "./Base64EncodedHubProtocol"
@ -24,7 +24,7 @@ export class HubConnection {
private readonly connection: IConnection;
private readonly logger: ILogger;
private protocol: IHubProtocol;
private callbacks: Map<string, (invocationUpdate: CompletionMessage | ResultMessage) => void>;
private callbacks: Map<string, (invocationEvent: HubMessage, error?: Error) => void>;
private methods: Map<string, ((...args: any[]) => void)[]>;
private id: number;
private closedCallbacks: ConnectionClosed[];
@ -44,7 +44,7 @@ export class HubConnection {
this.connection.onreceive = (data: any) => this.processIncomingData(data);
this.connection.onclose = (error?: Error) => this.connectionClosed(error);
this.callbacks = new Map<string, (invocationEvent: CompletionMessage | ResultMessage) => void>();
this.callbacks = new Map<string, (invocationEvent: HubMessage, error?: Error) => void>();
this.methods = new Map<string, ((...args: any[]) => void)[]>();
this.closedCallbacks = [];
this.id = 0;
@ -63,9 +63,10 @@ export class HubConnection {
break;
case MessageType.Result:
case MessageType.Completion:
case MessageType.StreamCompletion:
let callback = this.callbacks.get(message.invocationId);
if (callback != null) {
if (message.type == MessageType.Completion) {
if (message.type == MessageType.Completion || message.type == MessageType.StreamCompletion) {
this.callbacks.delete(message.invocationId);
}
callback(message);
@ -92,14 +93,8 @@ export class HubConnection {
}
private connectionClosed(error?: Error) {
let errorCompletionMessage = <CompletionMessage>{
type: MessageType.Completion,
invocationId: "-1",
error: error ? error.message : "Invocation cancelled due to connection being closed.",
};
this.callbacks.forEach(callback => {
callback(errorCompletionMessage);
callback(undefined, error ? error : new Error("Invocation canceled due to connection being closed."));
});
this.callbacks.clear();
@ -136,22 +131,28 @@ export class HubConnection {
let subject = new Subject<T>();
this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => {
if (invocationEvent.type === MessageType.Completion) {
let completionMessage = <CompletionMessage>invocationEvent;
if (completionMessage.error) {
subject.error(new Error(completionMessage.error));
}
else if (completionMessage.result) {
subject.error(new Error("Server provided a result in a completion response to a streamed invocation."));
}
else {
// TODO: Log a warning if there's a payload?
subject.complete();
}
this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: HubMessage, error?: Error) => {
if (error) {
subject.error(error);
return;
}
else {
subject.next(<T>(<ResultMessage>invocationEvent).item);
switch (invocationEvent.type) {
case MessageType.StreamCompletion:
let completionMessage = <StreamCompletionMessage>invocationEvent;
if (completionMessage.error) {
subject.error(new Error(completionMessage.error));
}
else {
subject.complete();
}
break;
case MessageType.Result:
subject.next(<T>(<ResultMessage>invocationEvent).item);
break;
default:
subject.error(new Error("Hub methods must be invoked using the 'HubConnection.invoke()' method."));
break;
}
});
@ -178,7 +179,11 @@ export class HubConnection {
let invocationDescriptor = this.createInvocation(methodName, args, false);
let p = new Promise<any>((resolve, reject) => {
this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => {
this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: HubMessage, error?: Error) => {
if (error) {
reject(error);
return;
}
if (invocationEvent.type === MessageType.Completion) {
let completionMessage = <CompletionMessage>invocationEvent;
if (completionMessage.error) {
@ -189,7 +194,7 @@ export class HubConnection {
}
}
else {
reject(new Error("Streaming methods must be invoked using HubConnection.stream"))
reject(new Error("Streaming methods must be invoked using the 'HubConnection.stream()' method."));
}
});

View File

@ -4,7 +4,8 @@
export const enum MessageType {
Invocation = 1,
Result,
Completion
Completion,
StreamCompletion
}
export interface HubMessage {
@ -22,6 +23,10 @@ export interface ResultMessage extends HubMessage {
readonly item?: any;
}
export interface StreamCompletionMessage extends HubMessage {
readonly error?: string;
}
export interface CompletionMessage extends HubMessage {
readonly error?: string;
readonly result?: any;

View File

@ -1,7 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
import { IHubProtocol, ProtocolType, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage } from "./IHubProtocol";
import { IHubProtocol, ProtocolType, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage, StreamCompletionMessage } from "./IHubProtocol";
import { BinaryMessageFormat } from "./Formatters"
import * as msgpack5 from "msgpack5"
@ -34,6 +34,8 @@ export class MessagePackHubProtocol implements IHubProtocol {
return this.createStreamItemMessage(properties);
case MessageType.Completion:
return this.createCompletionMessage(properties);
case MessageType.StreamCompletion:
return this.createStreamCompletionMessage(properties);
default:
throw new Error("Invalid message type.");
}
@ -97,7 +99,19 @@ export class MessagePackHubProtocol implements IHubProtocol {
break;
}
return completionMessage as ResultMessage;
return completionMessage as CompletionMessage;
}
private createStreamCompletionMessage(properties: any[]): StreamCompletionMessage {
if (properties.length < 2) {
throw new Error("Invalid payload for Completion message.");
}
return <StreamCompletionMessage>{
type: MessageType.StreamCompletion,
invocationId: properties[1],
error: properties.length == 3 ? properties[2] : null,
};
}
writeMessage(message: HubMessage): ArrayBuffer {

View File

@ -42,6 +42,16 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
return new string[] { "a", "b", "c" }.ToObservable();
}
public IObservable<int> EmptyStream()
{
return Array.Empty<int>().ToObservable();
}
public IObservable<string> StreamThrowException(string message)
{
throw new InvalidOperationException(message);
}
public ComplexObject EchoComplexObject(ComplexObject complexObject)
{
return complexObject;

View File

@ -10,7 +10,7 @@ describe('hubConnection', function () {
describe(protocol.name + ' over ' + signalR.TransportType[transportType] + ' transport', function () {
it('can invoke server method and receive result', function (done) {
var message = "你好,世界!";
var message = '你好,世界!';
var options = {
transport: transportType,
@ -57,7 +57,7 @@ describe('hubConnection', function () {
});
hubConnection.start().then(function () {
hubConnection.send('SendCustomObject', { Name: "test", Value: 42});
hubConnection.send('SendCustomObject', { Name: 'test', Value: 42});
}).catch(function (e) {
fail(e);
done();
@ -89,7 +89,7 @@ describe('hubConnection', function () {
hubConnection.stop();
},
complete: function complete() {
expect(received).toEqual(["a", "b", "c"]);
expect(received).toEqual(['a', 'b', 'c']);
hubConnection.stop();
}
});
@ -100,7 +100,7 @@ describe('hubConnection', function () {
});
it('rethrows an exception from the server when invoking', function (done) {
var errorMessage = "An error occurred.";
var errorMessage = 'An error occurred.';
var options = {
transport: transportType,
protocol: protocol,
@ -125,8 +125,7 @@ describe('hubConnection', function () {
});
});
it('rethrows an exception from the server when streaming', function (done) {
var errorMessage = "An error occurred.";
it('throws an exception when invoking streaming method with invoke', function (done) {
var options = {
transport: transportType,
protocol: protocol,
@ -135,12 +134,90 @@ describe('hubConnection', function () {
var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options);
hubConnection.start().then(function () {
hubConnection.stream('ThrowException', errorMessage).subscribe({
hubConnection.invoke('EmptyStream').then(function () {
// exception expected but none thrown
fail();
}).catch(function (e) {
expect(e.message).toBe('Streaming methods must be invoked using the \'HubConnection.stream()\' method.');
}).then(function () {
return hubConnection.stop();
}).then(function () {
done();
});
}).catch(function (e) {
fail(e);
done();
});
});
it('throws an exception when receiving a streaming result for method called with invoke', function (done) {
var options = {
transport: transportType,
protocol: protocol,
logging: signalR.LogLevel.Trace
};
var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options);
hubConnection.start().then(function () {
hubConnection.invoke('Stream').then(function () {
// exception expected but none thrown
fail();
}).catch(function (e) {
expect(e.message).toBe('Streaming methods must be invoked using the \'HubConnection.stream()\' method.');
}).then(function () {
return hubConnection.stop();
}).then(function () {
done();
});
}).catch(function (e) {
fail(e);
done();
});
});
it('rethrows an exception from the server when streaming', function (done) {
var errorMessage = 'An error occurred.';
var options = {
transport: transportType,
protocol: protocol,
logging: signalR.LogLevel.Trace
};
var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options);
hubConnection.start().then(function () {
hubConnection.stream('StreamThrowException', errorMessage).subscribe({
next: function next(item) {
fail();
},
error: function error(err) {
expect(err.message).toEqual("An error occurred.");
expect(err.message).toEqual('An error occurred.');
done();
},
complete: function complete() {
fail();
}
});
}).catch(function (e) {
fail(e);
done();
});
});
it('throws an exception when invoking hub method with stream', function (done) {
var options = {
transport: transportType,
protocol: protocol,
logging: signalR.LogLevel.Trace
};
var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options);
hubConnection.start().then(function () {
hubConnection.stream('Echo', '42').subscribe({
next: function next(item) {
fail();
},
error: function error(err) {
expect(err.message).toEqual('Hub methods must be invoked using the \'HubConnection.invoke()\' method.');
done();
},
complete: function complete() {
@ -161,7 +238,7 @@ describe('hubConnection', function () {
};
var hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, options);
var message = "你好 SignalR";
var message = '你好 SignalR';
// client side method names are case insensitive
var methodName = 'message';
@ -187,9 +264,9 @@ describe('hubConnection', function () {
it('closed with error if hub cannot be created', function (done) {
var errorRegex = {
WebSockets: "1011|1005", // Message is browser specific (e.g. 'Websocket closed with status code: 1011'), Edge and IE report 1005 even though the server sent 1011
LongPolling: "Internal Server Error",
ServerSentEvents: "Error occurred"
WebSockets: '1011|1005', // Message is browser specific (e.g. 'Websocket closed with status code: 1011'), Edge and IE report 1005 even though the server sent 1011
LongPolling: 'Internal Server Error',
ServerSentEvents: 'Error occurred'
};
var options = {