Adding MsgPack hub protocol to TS client

This commit is contained in:
Pawel Kadluczka 2017-06-30 17:33:55 -07:00 committed by Pawel Kadluczka
parent e089852d59
commit e2cec0b305
11 changed files with 331 additions and 17 deletions

View File

@ -1,4 +1,4 @@
import { TextMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"
import { TextMessageFormat, BinaryMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"
describe("Text Message Formatter", () => {
it("should return empty array on empty input", () => {
@ -30,3 +30,43 @@ describe("Text Message Formatter", () => {
});
});
});
describe("Binary Message Formatter", () => {
([
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], <Uint8Array[]>[ new Uint8Array([])]],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff], <Uint8Array[]>[ new Uint8Array([0xff])]],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x7f], <Uint8Array[]>[ new Uint8Array([0xff]), new Uint8Array([0x7f])]],
] as [[number[], Uint8Array[]]]).forEach(([payload, expected_messages]) => {
it(`should parse '${payload}' correctly`, () => {
let messages = BinaryMessageFormat.parse(new Uint8Array(payload).buffer);
expect(messages).toEqual(expected_messages);
})
});
([
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Cannot read message size")],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x80, 0x00], new Error("Cannot read message size")],
[[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00], new Error("Incomplete message")],
] as [[number[], Error]]).forEach(([payload, expected_error]) => {
it(`should fail to parse '${payload}'`, () => {
expect(() => BinaryMessageFormat.parse(new Uint8Array(payload).buffer)).toThrow(expected_error);
})
});
([
[[], [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]],
[[0x20], [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20]],
] as [[number[], number[]]]).forEach(([input, expected_payload]) => {
it(`should write '${input}'`, () => {
let actual = new Uint8Array(BinaryMessageFormat.write(new Uint8Array(input)));
let expected = new Uint8Array(expected_payload);
expect(actual).toEqual(expected);
})
});
});

View File

@ -0,0 +1,103 @@
import { MessagePackHubProtocol } from "../Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol"
import { MessageType, InvocationMessage, CompletionMessage, ResultMessage } from "../Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol"
describe("MessageHubProtocol", () => {
it("can write/read Invocation message", () => {
let invocation = <InvocationMessage>{
type: MessageType.Invocation,
invocationId: "123",
target: "myMethod",
nonblocking: true,
arguments: [42, true, "test", ["x1", "y2"], null]
};
let protocol = new MessagePackHubProtocol();
var parsedMessages = protocol.parseMessages(protocol.writeMessage(invocation));
expect(parsedMessages).toEqual([invocation]);
});
([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0b,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x01, 0xa3, 0x45, 0x72, 0x72],
{
type: MessageType.Completion,
invocationId: "abc",
error: "Err",
result: null
} as CompletionMessage ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x02 ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: null
} as CompletionMessage ]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Completion message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([expected_message]);
}));
([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08 ],
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage ]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Result message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([expected_message]);
}));
([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xc2 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x05 ], new Error("Invalid message type.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x91, 0xa1, 0x78 ], new Error("Invalid message type.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x01 ], new Error("Invalid payload for Invocation message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x02 ], new Error("Invalid payload for stream Result message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x92, 0x03, 0xa0 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x94, 0x03, 0xa0, 0x02, 0x00 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x93, 0x03, 0xa0, 0x01 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x93, 0x03, 0xa0, 0x03 ], new Error("Invalid payload for Completion message.") ]
] as [[number[], Error]]).forEach(([payload, expected_error]) =>
it("throws for invalid messages", () => {
expect(() => new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer))
.toThrow(expected_error);
}));
it("can read multiple messages", () => {
let payload = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ];
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage,
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage
]);
});
});

View File

@ -34,7 +34,7 @@ export namespace TextMessageFormat {
if (!hasSpace(input, offset, 1 + length)) {
throw new Error("Message is incomplete");
}
// Read the payload
var payload = input.substr(offset, length);
offset += length;
@ -66,4 +66,56 @@ export namespace TextMessageFormat {
}
return messages;
}
}
export namespace BinaryMessageFormat {
export function write(output: Uint8Array): ArrayBuffer {
let size = output.byteLength;
let buffer = new Uint8Array(size + 8);
// javascript bitwise operators only support 32-bit integers
for (let i = 7; i >= 4; i--) {
buffer[i] = size & 0xff;
size = size >> 8;
}
buffer.set(output, 8);
return buffer.buffer;
}
export function parse(input: ArrayBuffer): Uint8Array[] {
let result: Uint8Array[] = [];
let uint8Array = new Uint8Array(input);
// 8 - the length prefix size
for (let offset = 0; offset < input.byteLength;) {
if (input.byteLength < offset + 8) {
throw new Error("Cannot read message size")
}
// Note javascript bitwise operators only support 32-bit integers - for now cutting bigger messages.
// Tracking bug https://github.com/aspnet/SignalR/issues/613
if (!(uint8Array[offset] == 0 && uint8Array[offset + 1] == 0 && uint8Array[offset + 2] == 0
&& uint8Array[offset + 3] == 0 && (uint8Array[offset + 4] & 0x80) == 0)) {
throw new Error("Messages bigger than 2147483647 bytes are not supported");
}
let size = 0;
for (let i = 4; i < 8; i++) {
size = (size << 8) | uint8Array[offset + i];
}
if (uint8Array.byteLength >= (offset + 8 + size)) {
result.push(uint8Array.slice(offset + 8, offset + 8 + size))
}
else {
throw new Error("Incomplete message");
}
offset = offset + 8 + size;
}
return result;
}
}

View File

@ -16,8 +16,9 @@ export class HubConnection {
private connectionClosedCallback: ConnectionClosed;
private protocol: IHubProtocol;
constructor(connection: IConnection) {
constructor(connection: IConnection, protocol: IHubProtocol = new JsonHubProtocol()) {
this.connection = connection;
this.protocol = protocol || new JsonHubProtocol();
this.connection.onDataReceived = data => {
this.onDataReceived(data);
};
@ -28,7 +29,6 @@ export class HubConnection {
this.callbacks = new Map<string, (invocationEvent: CompletionMessage | ResultMessage) => void>();
this.methods = new Map<string, (...args: any[]) => void>();
this.id = 0;
this.protocol = new JsonHubProtocol();
}
private onDataReceived(data: any) {

View File

@ -30,6 +30,6 @@ export interface NegotiationMessage {
export interface IHubProtocol {
name(): string;
parseMessages(input: string): HubMessage[];
writeMessage(message: HubMessage): string;
parseMessages(input: any): HubMessage[];
writeMessage(message: HubMessage): any;
}

View File

@ -0,0 +1,118 @@
import { IHubProtocol, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage } from "./IHubProtocol";
import { BinaryMessageFormat } from "./Formatters"
import * as msgpack5 from "msgpack5"
export class MessagePackHubProtocol implements IHubProtocol {
name(): string {
return "messagepack";
}
parseMessages(input: ArrayBuffer): HubMessage[] {
return BinaryMessageFormat.parse(input).map(m => this.parseMessage(m));
}
private parseMessage(input: Uint8Array): HubMessage {
if (input.length == 0) {
throw new Error("Invalid payload.");
}
let msgpack = msgpack5();
let properties = msgpack.decode(new Buffer(input));
if (properties.length == 0 || !(properties instanceof Array)) {
throw new Error("Invalid payload.");
}
let messageType = properties[0] as MessageType;
switch (messageType) {
case MessageType.Invocation:
return this.createInvocationMessage(properties);
case MessageType.Result:
return this.createStreamItemMessage(properties);
case MessageType.Completion:
return this.createCompletionMessage(properties);
default:
throw new Error("Invalid message type.");
}
}
private createInvocationMessage(properties: any[]): InvocationMessage {
if (properties.length != 5) {
throw new Error("Invalid payload for Invocation message.");
}
return {
type: MessageType.Invocation,
invocationId: properties[1],
nonblocking: properties[2],
target: properties[3],
arguments: properties[4]
} as InvocationMessage;
}
private createStreamItemMessage(properties: any[]): ResultMessage {
if (properties.length != 3) {
throw new Error("Invalid payload for stream Result message.");
}
return {
type: MessageType.Result,
invocationId: properties[1],
item: properties[2]
} as ResultMessage;
}
private createCompletionMessage(properties: any[]): CompletionMessage {
if (properties.length < 3) {
throw new Error("Invalid payload for Completion message.");
}
const errorResult = 1;
const voidResult = 2;
const nonVoidResult = 3;
let resultKind = properties[2];
if ((resultKind === voidResult && properties.length != 3) ||
(resultKind !== voidResult && properties.length != 4)) {
throw new Error("Invalid payload for Completion message.");
}
let completionMessage = {
type: MessageType.Completion,
invocationId: properties[1],
error: null as string,
result: null as any
};
switch (resultKind) {
case errorResult:
completionMessage.error = properties[3];
break;
case nonVoidResult:
completionMessage.result = properties[3];
break;
}
return completionMessage as ResultMessage;
}
writeMessage(message: HubMessage): ArrayBuffer {
switch (message.type) {
case MessageType.Invocation:
return this.writeInvocation(message as InvocationMessage);
case MessageType.Result:
case MessageType.Completion:
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
default:
throw new Error("Invalid message type.");
}
}
private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
let msgpack = msgpack5();
let payload = msgpack.encode([ MessageType.Invocation, invocationMessage.invocationId,
invocationMessage.nonblocking, invocationMessage.target, invocationMessage.arguments]);
return BinaryMessageFormat.write(payload.slice());
}
}

View File

@ -24,6 +24,7 @@ gulp.task('browserify-client', ['compile-ts-client'], () => {
.pipe(gulp.dest(clientOutDir + '/../browser'));
});
gulp.task('build-ts-client', ['clean', 'compile-ts-client', 'browserify-client']);
gulp.task('default', ['build-ts-client']);

View File

@ -5,7 +5,7 @@ describe('hubConnection', () => {
describe(`${signalR.TransportType[transportType]} transport`, () => {
it(`can invoke server method and receive result`, done => {
const message = "Hi";
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }), 'formatType=json&format=text');
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }));
hubConnection.onClosed = error => {
expect(error).toBe(undefined);
done();
@ -31,7 +31,7 @@ describe('hubConnection', () => {
});
it(`can stream server method and receive result`, done => {
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }), 'formatType=json&format=text');
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }));
hubConnection.onClosed = error => {
expect(error).toBe(undefined);
done();
@ -63,7 +63,7 @@ describe('hubConnection', () => {
it(`rethrows an exception from the server when invoking`, done => {
const errorMessage = "An error occurred.";
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }), 'formatType=json&format=text');
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }));
hubConnection.start()
.then(() => {
@ -90,7 +90,7 @@ describe('hubConnection', () => {
it(`rethrows an exception from the server when streaming`, done => {
const errorMessage = "An error occurred.";
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }), 'formatType=json&format=text');
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }));
hubConnection.start()
.then(() => {
@ -116,7 +116,7 @@ describe('hubConnection', () => {
});
it(`can receive server calls`, done => {
let client = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }), 'formatType=json&format=text');
let client = new signalR.HubConnection(new signalR.HttpConnection(TESTHUBENDPOINT_URL, { transport: transportType }));
const message = "Hello SignalR";
let callbackPromise = new Promise((resolve, reject) => {
@ -150,7 +150,7 @@ describe('hubConnection', () => {
ServerSentEvents: "Error occurred"
};
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(`http://${document.location.host}/uncreatable`, { transport: transportType }), 'formatType=json&format=text');
let hubConnection = new signalR.HubConnection(new signalR.HttpConnection(`http://${document.location.host}/uncreatable`, { transport: transportType }));
hubConnection.onClosed = error => {
expect(error).toMatch(errorRegex[signalR.TransportType[transportType]]);

View File

@ -23,11 +23,13 @@
},
"homepage": "https://github.com/aspnet/SignalR#readme",
"devDependencies": {
"@types/msgpack5": "^3.4.0",
"browserify": "^13.1.1",
"del": "^2.2.2",
"gulp": "^3.9.1",
"gulp-typescript": "^3.1.3",
"jasmine": "^2.5.2",
"msgpack5": "^3.5.0",
"typescript": "^2.0.10",
"vinyl-source-stream": "^1.1.0"
}

View File

@ -136,7 +136,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
WriteInvocationMessage(invocationMessage, packer, output);
break;
case StreamItemMessage streamItemMessage:
WriteStremingItemMessage(streamItemMessage, packer, output);
WriteStreamingItemMessage(streamItemMessage, packer, output);
break;
case CompletionMessage completionMessage:
WriteCompletionMessage(completionMessage, packer, output);
@ -156,7 +156,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
packer.PackObject(invocationMessage.Arguments);
}
private void WriteStremingItemMessage(StreamItemMessage streamItemMessage, Packer packer, Stream output)
private void WriteStreamingItemMessage(StreamItemMessage streamItemMessage, Packer packer, Stream output)
{
packer.PackArrayHeader(3);
packer.Pack(StreamItemMessageType);
@ -171,7 +171,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
completionMessage.HasResult ? NonVoidResult :
VoidResult;
packer.PackArrayHeader(2 + resultKind != VoidResult ? 1 : 0);
packer.PackArrayHeader(3 + resultKind != VoidResult ? 1 : 0);
packer.Pack(CompletionMessageType);
packer.PackString(completionMessage.InvocationId);
packer.Pack(resultKind);

View File

@ -109,8 +109,6 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
new object[] { new byte[] { 0x93, 0x03, 0xa3, 0x78, 0x79, 0x7a, 0x03 }, "Deserializing object of the `String` type for 'argument' failed." }, // non void result but result missing
new object[] { new byte[] { 0x93, 0x03, 0xa3, 0x78, 0x79, 0x7a, 0x03, 0xa9 }, "Deserializing object of the `String` type for 'argument' failed." }, // result is cut
new object[] { new byte[] { 0x93, 0x03, 0xa3, 0x78, 0x79, 0x7a, 0x03, 0x00 }, "Deserializing object of the `String` type for 'argument' failed." }, // return type mismatch
// TODO: ReadAsInt32 and no int32 value
};
[Theory]