First pass at Keep Alive (#1119)

This adds the Ping message type and support for sending/receiving it in the Hub Protocols. It does not add the logic to transmit keep-alive frames.
This commit is contained in:
Andrew Stanton-Nurse 2017-11-16 09:45:13 -08:00 committed by GitHub
parent 379160707f
commit cb3124be17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 436 additions and 177 deletions

View File

@ -129,7 +129,7 @@ describe("HubConnection", () => {
it("invocations ignored in callbacks not registered", async () => {
let warnings: string[] = [];
let logger = <ILogger>{
log: function(logLevel: LogLevel, message: string) {
log: function (logLevel: LogLevel, message: string) {
if (logLevel === LogLevel.Warning) {
warnings.push(message);
}
@ -219,15 +219,15 @@ describe("HubConnection", () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
hubConnection.off("_", () => {});
hubConnection.on("message", t => {});
hubConnection.on("message", () => {});
hubConnection.off("_", () => { });
hubConnection.on("message", t => { });
hubConnection.on("message", () => { });
});
it("using null/undefined for methodName or method no-ops", async () => {
let warnings: string[] = [];
let logger = <ILogger>{
log: function(logLevel: LogLevel, message: string) {
log: function (logLevel: LogLevel, message: string) {
if (logLevel === LogLevel.Warning) {
warnings.push(message);
}
@ -242,8 +242,8 @@ describe("HubConnection", () => {
hubConnection.on(undefined, null);
hubConnection.on("message", null);
hubConnection.on("message", undefined);
hubConnection.on(null, () => {});
hubConnection.on(undefined, () => {});
hubConnection.on(null, () => { });
hubConnection.on(undefined, () => { });
// invoke a method to make sure we are not trying to use null/undefined
connection.receive({
@ -260,8 +260,8 @@ describe("HubConnection", () => {
hubConnection.off(undefined, null);
hubConnection.off("message", null);
hubConnection.off("message", undefined);
hubConnection.off(null, () => {});
hubConnection.off(undefined, () => {});
hubConnection.off(null, () => { });
hubConnection.off(undefined, () => { });
});
});
@ -351,13 +351,13 @@ describe("HubConnection", () => {
hubConnection.stream<any>("testMethod")
.subscribe(observer);
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 1 });
connection.receive({ type: MessageType.StreamItem, invocationId: connection.lastInvocationId, item: 1 });
expect(observer.itemsReceived).toEqual([1]);
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 2 });
connection.receive({ type: MessageType.StreamItem, invocationId: connection.lastInvocationId, item: 2 });
expect(observer.itemsReceived).toEqual([1, 2]);
connection.receive({ type: MessageType.Result, invocationId: connection.lastInvocationId, item: 3 });
connection.receive({ type: MessageType.StreamItem, invocationId: connection.lastInvocationId, item: 3 });
expect(observer.itemsReceived).toEqual([1, 2, 3]);
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId });
@ -392,7 +392,7 @@ describe("HubConnection", () => {
});
describe("onClose", () => {
it("it can have multiple callbacks", async () => {
it("can have multiple callbacks", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let invocations = 0;
@ -424,6 +424,21 @@ describe("HubConnection", () => {
// expect no errors
});
});
describe("keepAlive", () => {
it("can receive ping messages", async () => {
// Receive the ping mid-invocation so we can see that the rest of the flow works fine
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let invokePromise = hubConnection.invoke("testMethod", "arg", 42);
connection.receive({ type: MessageType.Ping });
connection.receive({ type: MessageType.Completion, invocationId: connection.lastInvocationId, result: "foo" });
expect(await invokePromise).toBe("foo");
})
})
});
class TestConnection implements IConnection {
@ -435,7 +450,10 @@ class TestConnection implements IConnection {
send(data: any): Promise<void> {
let invocation = TextMessageFormat.parse(data)[0];
this.lastInvocationId = JSON.parse(invocation).invocationId;
let invocationId = JSON.parse(invocation).invocationId;
if (invocationId) {
this.lastInvocationId = invocationId;
}
if (this.sentData) {
this.sentData.push(invocation);
}

View File

@ -20,27 +20,27 @@ describe("MessageHubProtocol", () => {
});
([
[ [ 0x0b, 0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x01, 0xa3, 0x45, 0x72, 0x72],
{
type: MessageType.Completion,
invocationId: "abc",
error: "Err",
result: null
} as CompletionMessage ],
[ [ 0x0a, 0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage ],
[ [ 0x07, 0x93, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x02 ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: null
} as CompletionMessage ]
[[0x0b, 0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x01, 0xa3, 0x45, 0x72, 0x72],
{
type: MessageType.Completion,
invocationId: "abc",
error: "Err",
result: null
} as CompletionMessage],
[[0x0a, 0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage],
[[0x07, 0x93, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x02],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: null
} as CompletionMessage]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Completion message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
@ -48,12 +48,12 @@ describe("MessageHubProtocol", () => {
}));
([
[ [ 0x07, 0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08 ],
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage ]
[[0x07, 0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08],
{
type: MessageType.StreamItem,
invocationId: "abc",
item: 8
} as ResultMessage]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Result message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
@ -61,31 +61,31 @@ describe("MessageHubProtocol", () => {
}));
([
[ [ 0x00 ], new Error("Invalid payload.") ],
[ [ 0x01, 0x90 ], new Error("Invalid payload.") ],
[ [ 0x01, 0xc2 ], new Error("Invalid payload.") ],
[ [ 0x02, 0x91, 0x05 ], new Error("Invalid message type.") ],
[ [ 0x03, 0x91, 0xa1, 0x78 ], new Error("Invalid message type.") ],
[ [ 0x02, 0x91, 0x01 ], new Error("Invalid payload for Invocation message.") ],
[ [ 0x02, 0x91, 0x02 ], new Error("Invalid payload for stream Result message.") ],
[ [ 0x03, 0x92, 0x03, 0xa0 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x05, 0x94, 0x03, 0xa0, 0x02, 0x00 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x04, 0x93, 0x03, 0xa0, 0x01 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x04, 0x93, 0x03, 0xa0, 0x03 ], new Error("Invalid payload for Completion message.") ]
[[0x00], new Error("Invalid payload.")],
[[0x01, 0x90], new Error("Invalid payload.")],
[[0x01, 0xc2], new Error("Invalid payload.")],
[[0x02, 0x91, 0x05], new Error("Invalid message type.")],
[[0x03, 0x91, 0xa1, 0x78], new Error("Invalid message type.")],
[[0x02, 0x91, 0x01], new Error("Invalid payload for Invocation message.")],
[[0x02, 0x91, 0x02], new Error("Invalid payload for stream Result message.")],
[[0x03, 0x92, 0x03, 0xa0], new Error("Invalid payload for Completion message.")],
[[0x05, 0x94, 0x03, 0xa0, 0x02, 0x00], new Error("Invalid payload for Completion message.")],
[[0x04, 0x93, 0x03, 0xa0, 0x01], new Error("Invalid payload for Completion message.")],
[[0x04, 0x93, 0x03, 0xa0, 0x03], new Error("Invalid payload for Completion message.")]
] as [[number[], Error]]).forEach(([payload, expected_error]) =>
it("throws for invalid messages", () => {
expect(() => new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer))
.toThrow(expected_error);
.toThrow(expected_error);
}));
it("can read multiple messages", () => {
let payload = [
0x07, 0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08,
0x0a, 0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ];
0x0a, 0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b];
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([
{
type: MessageType.Result,
type: MessageType.StreamItem,
invocationId: "abc",
item: 8
} as ResultMessage,
@ -97,4 +97,18 @@ describe("MessageHubProtocol", () => {
} as CompletionMessage
]);
});
it("can read ping message", () => {
let payload = [
0x02,
0x91, // message array length = 1 (fixarray)
0x06, // type = 6 = Ping (fixnum)
];
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([
{
type: MessageType.Ping,
}
])
})
});

View File

@ -3,10 +3,10 @@
import { ConnectionClosed } from "./Common"
import { IConnection } from "./IConnection"
import { HttpConnection} from "./HttpConnection"
import { HttpConnection } from "./HttpConnection"
import { TransportType, TransferMode } from "./Transports"
import { Subject, Observable } from "./Observable"
import { IHubProtocol, ProtocolType, MessageType, HubMessage, CompletionMessage, ResultMessage, InvocationMessage, StreamInvocationMessage, NegotiationMessage } from "./IHubProtocol";
import { IHubProtocol, ProtocolType, MessageType, HubMessage, CompletionMessage, ResultMessage, InvocationMessage, StreamInvocationMessage, NegotiationMessage, HubInvocationMessage } from "./IHubProtocol";
import { JsonHubProtocol } from "./JsonHubProtocol";
import { TextMessageFormat } from "./Formatters"
import { Base64EncodedHubProtocol } from "./Base64EncodedHubProtocol"
@ -61,16 +61,19 @@ export class HubConnection {
case MessageType.Invocation:
this.invokeClientMethod(<InvocationMessage>message);
break;
case MessageType.Result:
case MessageType.StreamItem:
case MessageType.Completion:
let callback = this.callbacks.get(message.invocationId);
let callback = this.callbacks.get((<HubInvocationMessage>message).invocationId);
if (callback != null) {
if (message.type === MessageType.Completion) {
this.callbacks.delete(message.invocationId);
this.callbacks.delete((<HubInvocationMessage>message).invocationId);
}
callback(message);
}
break;
case MessageType.Ping:
// Don't care about pings
break;
default:
this.logger.log(LogLevel.Warning, "Invalid message type: " + data);
break;

View File

@ -3,32 +3,37 @@
export const enum MessageType {
Invocation = 1,
Result,
Completion,
StreamInvocation
StreamItem = 2,
Completion = 3,
StreamInvocation = 4,
CancelInvocation = 5,
Ping = 6,
}
export interface HubMessage {
readonly type: MessageType;
}
export interface HubInvocationMessage extends HubMessage {
readonly invocationId: string;
}
export interface InvocationMessage extends HubMessage {
export interface InvocationMessage extends HubInvocationMessage {
readonly target: string;
readonly arguments: Array<any>;
readonly nonblocking?: boolean;
}
export interface StreamInvocationMessage extends HubMessage {
export interface StreamInvocationMessage extends HubInvocationMessage {
readonly target: string;
readonly arguments: Array<any>
}
export interface ResultMessage extends HubMessage {
export interface ResultMessage extends HubInvocationMessage {
readonly item?: any;
}
export interface CompletionMessage extends HubMessage {
export interface CompletionMessage extends HubInvocationMessage {
readonly error?: string;
readonly result?: any;
}

View File

@ -30,15 +30,27 @@ export class MessagePackHubProtocol implements IHubProtocol {
switch (messageType) {
case MessageType.Invocation:
return this.createInvocationMessage(properties);
case MessageType.Result:
case MessageType.StreamItem:
return this.createStreamItemMessage(properties);
case MessageType.Completion:
return this.createCompletionMessage(properties);
case MessageType.Ping:
return this.createPingMessage(properties);
default:
throw new Error("Invalid message type.");
}
}
private createPingMessage(properties: any[]): HubMessage {
if (properties.length != 1) {
throw new Error("Invalid payload for Ping message.");
}
return {
type: properties[0]
} as HubMessage;
}
private createInvocationMessage(properties: any[]): InvocationMessage {
if (properties.length != 5) {
throw new Error("Invalid payload for Invocation message.");
@ -59,7 +71,7 @@ export class MessagePackHubProtocol implements IHubProtocol {
}
return {
type: MessageType.Result,
type: MessageType.StreamItem,
invocationId: properties[1],
item: properties[2]
} as ResultMessage;
@ -106,7 +118,7 @@ export class MessagePackHubProtocol implements IHubProtocol {
return this.writeInvocation(message as InvocationMessage);
case MessageType.StreamInvocation:
return this.writeStreamInvocation(message as StreamInvocationMessage);
case MessageType.Result:
case MessageType.StreamItem:
case MessageType.Completion:
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
default:
@ -116,16 +128,16 @@ export class MessagePackHubProtocol implements IHubProtocol {
private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
let msgpack = msgpack5();
let payload = msgpack.encode([ MessageType.Invocation, invocationMessage.invocationId,
invocationMessage.nonblocking, invocationMessage.target, invocationMessage.arguments]);
let payload = msgpack.encode([MessageType.Invocation, invocationMessage.invocationId,
invocationMessage.nonblocking, invocationMessage.target, invocationMessage.arguments]);
return BinaryMessageFormat.write(payload.slice());
}
private writeStreamInvocation(streamInvocationMessage: StreamInvocationMessage): ArrayBuffer {
let msgpack = msgpack5();
let payload = msgpack.encode([ MessageType.StreamInvocation, streamInvocationMessage.invocationId,
streamInvocationMessage.target, streamInvocationMessage.arguments]);
let payload = msgpack.encode([MessageType.StreamInvocation, streamInvocationMessage.invocationId,
streamInvocationMessage.target, streamInvocationMessage.arguments]);
return BinaryMessageFormat.write(payload.slice());
}

View File

@ -26,6 +26,7 @@ In the SignalR protocol, the following types of messages can be sent:
* `StreamItem` Message - Indicates individual items of streamed response data from a previous Invocation message.
* `Completion` Message - Indicates a previous Invocation or StreamInvocation has completed. Contains an error if the invocation concluded with an error or the result of a non-streaming method invocation. The result will be absent for `void` methods. In case of streaming invocations no further `StreamItem` messages will be received
* `CancelInvocation` Message - Sent by the client to cancel a streaming invocation on the server.
* `Ping` Message - Sent by either party to check if the connection is active.
After opening a connection to the server the client must send a `Negotiation` message to the server as its first message. The negotiation message is **always** a JSON message and contains the name of the format (protocol) that will be used for the duration of the connection. If the server does not support the protocol requested by the client or the first message received from the client is not a `Negotiation` message the server must close the connection.
@ -97,7 +98,18 @@ If either endpoint commits a Protocol Error (see examples below), the other endp
* It is a protocol error for a Caller to send a `Completion` message carrying both a result and an error.
* It is a protocol error for an `Invocation` or `StreamInvocation` message to have an `Invocation ID` that has already been used by *that* endpoint. However, it is **not an error** for one endpoint to use an `Invocation ID` that was previously used by the other endpoint (allowing each endpoint to track it's own IDs).
## Examples
## Ping (aka "Keep Alive")
The SignalR Hub protocol supports "Keep Alive" messages used to ensure that the underlying transport connection remains active. These messages help ensure:
1. Proxies don't close the underlying connection during idle times (when few messages are being sent)
2. If the underlying connection is dropped without being terminated gracefully, the application is informed as quickly as possible.
Keep alive behavior is achieved via the `Ping` message type. **Either endpoint** may send a `Ping` message at any time. The receiving endpoint may choose to ignore the message, it has no obligation to respond in anyway. Most implementations will want to reset a timeout used to determine if the other party is present.
Ping messages do not have any payload, they are completely empty messages (aside from the encoding necessary to identify the message as a `Ping` message).
## Example
Consider the following C# methods
@ -233,6 +245,12 @@ S->C: Completion { Id = 42 } // This can be ignored
C->S: Invocation { Id = 42, Target = "NonBlocking", Arguments = [ "foo" ], NonBlocking = true }
```
### Ping
```
C->S: Ping
```
## JSON Encoding
In the JSON Encoding of the SignalR Protocol, each Message is represented as a single JSON object, which should be the only content of the underlying message from the Transport. All property names are case-sensitive. The underlying protocol is expected to handle encoding and decoding of the text, so the JSON string should be encoded in whatever form is expected by the underlying transport. For example, when using the ASP.NET Sockets transports, UTF-8 encoding is always used for text.
@ -381,6 +399,18 @@ Example
}
```
### Ping Message Encoding
A `Ping` message is a JSON object with the following properties:
* `type` - A `Number` with the literal value `6`, indicating that this is a `Ping`.
Example
```json
{
"type": 6
}
```
### JSON Payload Encoding
Items in the arguments array within the `Invocation` message type, as well as the `item` value of the `StreamItem` message and the `result` value of the `Completion` message, encode values which have meaning to each particular Binder. A general guideline for encoding/decoding these values is provided in the "Type Mapping" section at the end of this document, but Binders should provide configuration to applications to allow them to customize these mappings. These mappings need not be self-describing, because when decoding the value, the Binder is expected to know the destination type (by looking up the definition of the method indicated by the Target).
@ -605,6 +635,30 @@ is decoded as follows:
* `0x79` - `y`
* `0x7a` - `z`
### Ping Message Encoding
`Ping` messages have the following structure
```
[6]
```
* `6` - Message Type - `6` indicates this is a `Ping` message.
Examples:
#### Ping message
The following payload:
```
0x91 0x06
```
is decoded as follows:
* `0x91` - 1-element array
* `0x06` - `6` (Message Type - `Ping` message)
## Protocol Buffers (ProtoBuf) Encoding
**Protobuf encoding is currently not implemented**

View File

@ -133,7 +133,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
public IDisposable On(string methodName, Type[] parameterTypes, Func<object[], object, Task> handler, object state)
{
var invocationHandler = new InvocationHandler(parameterTypes, handler, state);
var invocationList = _handlers.AddOrUpdate(methodName, _ => new List<InvocationHandler> { invocationHandler },
var invocationList = _handlers.AddOrUpdate(methodName, _ => new List<InvocationHandler> { invocationHandler },
(_, invocations) =>
{
lock (invocations)
@ -246,7 +246,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
return SendHubMessage(invocationMessage, irq);
}
private async Task SendHubMessage(HubMessage hubMessage, InvocationRequest irq)
private async Task SendHubMessage(HubInvocationMessage hubMessage, InvocationRequest irq)
{
try
{
@ -328,6 +328,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
DispatchInvocationStreamItemAsync(streamItem, irq);
break;
case PingMessage _:
// Nothing to do on receipt of a ping.
break;
default:
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;

View File

@ -1,12 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public class CancelInvocationMessage : HubMessage
public class CancelInvocationMessage : HubInvocationMessage
{
public CancelInvocationMessage(string invocationId) : base(invocationId)
{
}
}
}
}

View File

@ -5,7 +5,7 @@ using System;
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public class CompletionMessage : HubMessage
public class CompletionMessage : HubInvocationMessage
{
public string Error { get; }
public object Result { get; }

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public abstract class HubInvocationMessage : HubMessage
{
public string InvocationId { get; }
protected HubInvocationMessage(string invocationId)
{
InvocationId = invocationId;
}
}
}

View File

@ -1,15 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
// This is basically just a marker type now so that HubProtocol can return a common base class other than object.
public abstract class HubMessage
{
public string InvocationId { get; }
protected HubMessage(string invocationId)
{
InvocationId = invocationId;
}
}
}

View File

@ -7,7 +7,7 @@ using System.Runtime.ExceptionServices;
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public abstract class HubMethodInvocationMessage : HubMessage
public abstract class HubMethodInvocationMessage : HubInvocationMessage
{
private readonly ExceptionDispatchInfo _argumentBindingException;
private readonly object[] _arguments;

View File

@ -0,0 +1,12 @@
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
internal static class HubProtocolConstants
{
public const int InvocationMessageType = 1;
public const int StreamItemMessageType = 2;
public const int CompletionMessageType = 3;
public const int StreamInvocationMessageType = 4;
public const int CancelInvocationMessageType = 5;
public const int PingMessageType = 6;
}
}

View File

@ -22,12 +22,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private const string TargetPropertyName = "target";
private const string NonBlockingPropertyName = "nonBlocking";
private const string ArgumentsPropertyName = "arguments";
private const int InvocationMessageType = 1;
private const int ResultMessageType = 2;
private const int CompletionMessageType = 3;
private const int StreamInvocationMessageType = 4;
private const int CancelInvocationMessageType = 5;
private const string PayloadPropertyName = "payload";
// ONLY to be used for application payloads (args, return values, etc.)
private JsonSerializer _payloadSerializer;
@ -108,16 +103,18 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
var type = JsonUtils.GetRequiredProperty<int>(json, TypePropertyName, JTokenType.Integer);
switch (type)
{
case InvocationMessageType:
case HubProtocolConstants.InvocationMessageType:
return BindInvocationMessage(json, binder);
case StreamInvocationMessageType:
case HubProtocolConstants.StreamInvocationMessageType:
return BindStreamInvocationMessage(json, binder);
case ResultMessageType:
return BindResultMessage(json, binder);
case CompletionMessageType:
case HubProtocolConstants.StreamItemMessageType:
return BindStreamItemMessage(json, binder);
case HubProtocolConstants.CompletionMessageType:
return BindCompletionMessage(json, binder);
case CancelInvocationMessageType:
case HubProtocolConstants.CancelInvocationMessageType:
return BindCancelInvocationMessage(json);
case HubProtocolConstants.PingMessageType:
return PingMessage.Instance;
default:
throw new FormatException($"Unknown message type: {type}");
}
@ -150,6 +147,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
case CancelInvocationMessage m:
WriteCancelInvocationMessage(m, writer);
break;
case PingMessage m:
WritePingMessage(m, writer);
break;
default:
throw new InvalidOperationException($"Unsupported message type: {message.GetType().FullName}");
}
@ -159,7 +159,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteCompletionMessage(CompletionMessage message, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteHubMessageCommon(message, writer, CompletionMessageType);
WriteHubInvocationMessageCommon(message, writer, HubProtocolConstants.CompletionMessageType);
if (!string.IsNullOrEmpty(message.Error))
{
writer.WritePropertyName(ErrorPropertyName);
@ -176,14 +176,14 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteCancelInvocationMessage(CancelInvocationMessage message, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteHubMessageCommon(message, writer, CancelInvocationMessageType);
WriteHubInvocationMessageCommon(message, writer, HubProtocolConstants.CancelInvocationMessageType);
writer.WriteEndObject();
}
private void WriteStreamItemMessage(StreamItemMessage message, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteHubMessageCommon(message, writer, ResultMessageType);
WriteHubInvocationMessageCommon(message, writer, HubProtocolConstants.StreamItemMessageType);
writer.WritePropertyName(ItemPropertyName);
_payloadSerializer.Serialize(writer, message.Item);
writer.WriteEndObject();
@ -192,7 +192,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteInvocationMessage(InvocationMessage message, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteHubMessageCommon(message, writer, InvocationMessageType);
WriteHubInvocationMessageCommon(message, writer, HubProtocolConstants.InvocationMessageType);
writer.WritePropertyName(TargetPropertyName);
writer.WriteValue(message.Target);
@ -210,7 +210,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteStreamInvocationMessage(StreamInvocationMessage message, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteHubMessageCommon(message, writer, StreamInvocationMessageType);
WriteHubInvocationMessageCommon(message, writer, HubProtocolConstants.StreamInvocationMessageType);
writer.WritePropertyName(TargetPropertyName);
writer.WriteValue(message.Target);
@ -230,10 +230,22 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
writer.WriteEndArray();
}
private static void WriteHubMessageCommon(HubMessage message, JsonTextWriter writer, int type)
private void WritePingMessage(PingMessage m, JsonTextWriter writer)
{
writer.WriteStartObject();
WriteMessageType(writer, HubProtocolConstants.PingMessageType);
writer.WriteEndObject();
}
private static void WriteHubInvocationMessageCommon(HubInvocationMessage message, JsonTextWriter writer, int type)
{
writer.WritePropertyName(InvocationIdPropertyName);
writer.WriteValue(message.InvocationId);
WriteMessageType(writer, type);
}
private static void WriteMessageType(JsonTextWriter writer, int type)
{
writer.WritePropertyName(TypePropertyName);
writer.WriteValue(type);
}
@ -303,7 +315,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
}
}
private StreamItemMessage BindResultMessage(JObject json, IInvocationBinder binder)
private StreamItemMessage BindStreamItemMessage(JObject json, IInvocationBinder binder)
{
var invocationId = JsonUtils.GetRequiredProperty<string>(json, InvocationIdPropertyName, JTokenType.String);
var result = JsonUtils.GetRequiredProperty<JToken>(json, ItemPropertyName);

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -13,12 +13,6 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public class MessagePackHubProtocol : IHubProtocol
{
private const int InvocationMessageType = 1;
private const int StreamItemMessageType = 2;
private const int CompletionMessageType = 3;
private const int StreamInvocationMessageType = 4;
private const int CancelInvocationMessageType = 5;
private const int ErrorResult = 1;
private const int VoidResult = 2;
private const int NonVoidResult = 3;
@ -62,16 +56,18 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
switch (messageType)
{
case InvocationMessageType:
case HubProtocolConstants.InvocationMessageType:
return CreateInvocationMessage(unpacker, binder);
case StreamInvocationMessageType:
case HubProtocolConstants.StreamInvocationMessageType:
return CreateStreamInvocationMessage(unpacker, binder);
case StreamItemMessageType:
case HubProtocolConstants.StreamItemMessageType:
return CreateStreamItemMessage(unpacker, binder);
case CompletionMessageType:
case HubProtocolConstants.CompletionMessageType:
return CreateCompletionMessage(unpacker, binder);
case CancelInvocationMessageType:
case HubProtocolConstants.CancelInvocationMessageType:
return CreateCancelInvocationMessage(unpacker);
case HubProtocolConstants.PingMessageType:
return PingMessage.Instance;
default:
throw new FormatException($"Invalid message type: {messageType}.");
}
@ -155,7 +151,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
object result = null;
var hasResult = false;
switch(resultKind)
switch (resultKind)
{
case ErrorResult:
error = ReadString(unpacker, "error");
@ -212,6 +208,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
case CancelInvocationMessage cancelInvocationMessage:
WriteCancelInvocationMessage(cancelInvocationMessage, packer);
break;
case PingMessage pingMessage:
WritePingMessage(pingMessage, packer);
break;
default:
throw new FormatException($"Unexpected message type: {message.GetType().Name}");
}
@ -220,7 +219,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteInvocationMessage(InvocationMessage invocationMessage, Packer packer)
{
packer.PackArrayHeader(5);
packer.Pack(InvocationMessageType);
packer.Pack(HubProtocolConstants.InvocationMessageType);
packer.PackString(invocationMessage.InvocationId);
packer.Pack(invocationMessage.NonBlocking);
packer.PackString(invocationMessage.Target);
@ -230,7 +229,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteStreamInvocationMessage(StreamInvocationMessage streamInvocationMessage, Packer packer)
{
packer.PackArrayHeader(4);
packer.Pack(StreamInvocationMessageType);
packer.Pack(HubProtocolConstants.StreamInvocationMessageType);
packer.PackString(streamInvocationMessage.InvocationId);
packer.PackString(streamInvocationMessage.Target);
packer.PackObject(streamInvocationMessage.Arguments, _serializationContext);
@ -239,7 +238,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteStreamingItemMessage(StreamItemMessage streamItemMessage, Packer packer)
{
packer.PackArrayHeader(3);
packer.Pack(StreamItemMessageType);
packer.Pack(HubProtocolConstants.StreamItemMessageType);
packer.PackString(streamItemMessage.InvocationId);
packer.PackObject(streamItemMessage.Item, _serializationContext);
}
@ -253,7 +252,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
packer.PackArrayHeader(3 + (resultKind != VoidResult ? 1 : 0));
packer.Pack(CompletionMessageType);
packer.Pack(HubProtocolConstants.CompletionMessageType);
packer.PackString(completionMessage.InvocationId);
packer.Pack(resultKind);
switch (resultKind)
@ -270,10 +269,16 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private void WriteCancelInvocationMessage(CancelInvocationMessage cancelInvocationMessage, Packer packer)
{
packer.PackArrayHeader(2);
packer.Pack(CancelInvocationMessageType);
packer.Pack(HubProtocolConstants.CancelInvocationMessageType);
packer.PackString(cancelInvocationMessage.InvocationId);
}
private void WritePingMessage(PingMessage pingMessage, Packer packer)
{
packer.PackArrayHeader(1);
packer.Pack(HubProtocolConstants.PingMessageType);
}
private static string ReadInvocationId(Unpacker unpacker)
{
return ReadString(unpacker, "invocationId");

View File

@ -0,0 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public class PingMessage : HubMessage
{
public static readonly PingMessage Instance = new PingMessage();
private PingMessage()
{
}
}
}

View File

@ -3,7 +3,7 @@
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
public class StreamItemMessage : HubMessage
public class StreamItemMessage : HubInvocationMessage
{
public object Item { get; }

View File

@ -147,7 +147,7 @@ namespace Microsoft.AspNetCore.SignalR
return Task.CompletedTask;
}
private async Task WriteAsync(HubConnectionContext connection, HubMessage hubMessage)
private async Task WriteAsync(HubConnectionContext connection, HubInvocationMessage hubMessage)
{
while (await connection.Output.WaitToWriteAsync())
{

View File

@ -303,6 +303,10 @@ namespace Microsoft.AspNetCore.SignalR
}
break;
case PingMessage _:
// We don't care about pings
break;
// Other kind of message we weren't expecting
default:
_logger.UnsupportedMessageReceived(hubMessage.GetType().FullName);

View File

@ -370,7 +370,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
_ackHandler.Dispose();
}
private static async Task WriteAsync(HubConnectionContext connection, HubMessage hubMessage)
private static async Task WriteAsync(HubConnectionContext connection, HubInvocationMessage hubMessage)
{
while (await connection.Output.WaitToWriteAsync())
{
@ -404,7 +404,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
_logger.ReceivedFromChannel(_channelNamePrefix);
var message = DeserializeMessage<HubMessage>(data);
var message = DeserializeMessage<HubInvocationMessage>(data);
var tasks = new List<Task>(_connections.Count);
@ -519,7 +519,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
try
{
var message = DeserializeMessage<HubMessage>(data);
var message = DeserializeMessage<HubInvocationMessage>(data);
await WriteAsync(connection, message);
}
@ -540,7 +540,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
try
{
var message = DeserializeMessage<HubMessage>(data);
var message = DeserializeMessage<HubInvocationMessage>(data);
await WriteAsync(connection, message);
}
@ -558,7 +558,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
{
try
{
var message = DeserializeMessage<HubMessage>(data);
var message = DeserializeMessage<HubInvocationMessage>(data);
var tasks = new List<Task>(group.Connections.Count);
foreach (var groupConnection in group.Connections)

View File

@ -78,7 +78,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
throw new InvalidOperationException("Connection aborted!");
}
if (!string.Equals(message.InvocationId, invocationId))
if (message is HubInvocationMessage hubInvocationMessage && !string.Equals(hubInvocationMessage.InvocationId, invocationId))
{
throw new NotSupportedException("TestClient does not support multiple outgoing invocations!");
}
@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
throw new InvalidOperationException("Connection aborted!");
}
if (!string.Equals(message.InvocationId, invocationId))
if (message is HubInvocationMessage hubInvocationMessage && !string.Equals(hubInvocationMessage.InvocationId, invocationId))
{
throw new NotSupportedException("TestClient does not support multiple outgoing invocations!");
}
@ -150,7 +150,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
var payload = _protocolReaderWriter.WriteMessage(message);
await Application.Writer.WriteAsync(payload);
return message.InvocationId;
return message is HubInvocationMessage hubMessage ? hubMessage.InvocationId : null;
}
public async Task<HubMessage> ReadAsync()

View File

@ -5,13 +5,12 @@ using System;
using System.Globalization;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.Logging;
using Moq;
using Newtonsoft.Json;
using Xunit;
@ -391,5 +390,38 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await connection.DisposeAsync().OrTimeout();
}
}
[Fact]
public async Task AcceptsPingMessages()
{
var connection = new TestConnection(TransferMode.Text);
var hubConnection = new HubConnection(connection,
new JsonHubProtocol(), new LoggerFactory());
try
{
await hubConnection.StartAsync().OrTimeout();
// Ignore negotiate message
await connection.ReadSentTextMessageAsync().OrTimeout();
// Send an invocation
var invokeTask = hubConnection.InvokeAsync("Foo");
// Receive the ping mid-invocation so we can see that the rest of the flow works fine
await connection.ReceiveJsonMessage(new { type = 6 }).OrTimeout();
// Receive a completion
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).OrTimeout();
// Ensure the invokeTask completes properly
await invokeTask.OrTimeout();
}
finally
{
await hubConnection.DisposeAsync().OrTimeout();
await connection.DisposeAsync().OrTimeout();
}
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -189,13 +189,13 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
// Moq really doesn't handle out parameters well, so to make these tests work I added a manual mock -anurse
private class MockHubProtocol : IHubProtocol
{
private HubMessage _parsed;
private HubInvocationMessage _parsed;
private Exception _error;
public int ParseCalls { get; private set; } = 0;
public int WriteCalls { get; private set; } = 0;
public static MockHubProtocol ReturnOnParse(HubMessage parsed)
public static MockHubProtocol ReturnOnParse(HubInvocationMessage parsed)
{
return new MockHubProtocol
{

View File

@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
@ -14,7 +15,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
public CompositeTestBinder(HubMessage[] hubMessages)
{
_hubMessages = hubMessages;
_hubMessages = hubMessages.Where(IsBindableMessage).ToArray();
}
public Type[] GetParameterTypes(string methodName)
@ -28,5 +29,13 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
index++;
return new TestBinder(_hubMessages[index - 1]).GetReturnType(invocationId);
}
private bool IsBindableMessage(HubMessage arg)
{
return arg is CompletionMessage ||
arg is InvocationMessage ||
arg is StreamItemMessage ||
arg is StreamInvocationMessage;
}
}
}

View File

@ -56,7 +56,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
new object[] { new StreamInvocationMessage("123", "Target", null, new CustomObject()), false, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":4,\"target\":\"Target\",\"arguments\":[{\"StringProp\":\"SignalR!\",\"DoubleProp\":6.2831853071,\"IntProp\":42,\"DateTimeProp\":\"2017-04-11T00:00:00\",\"NullProp\":null,\"ByteArrProp\":\"AQID\"}]}" },
new object[] { new StreamInvocationMessage("123", "Target", null, new CustomObject()), true, NullValueHandling.Include, "{\"invocationId\":\"123\",\"type\":4,\"target\":\"Target\",\"arguments\":[{\"stringProp\":\"SignalR!\",\"doubleProp\":6.2831853071,\"intProp\":42,\"dateTimeProp\":\"2017-04-11T00:00:00\",\"nullProp\":null,\"byteArrProp\":\"AQID\"}]}" },
new object[] { new CancelInvocationMessage("123"), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":5}" }
new object[] { new CancelInvocationMessage("123"), true, NullValueHandling.Ignore, "{\"invocationId\":\"123\",\"type\":5}" },
new object[] { PingMessage.Instance, true, NullValueHandling.Ignore, "{\"type\":6}" },
};
[Theory]

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -51,6 +51,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
new object[] { new[] { new StreamInvocationMessage("xyz", "method", null, new[] { new CustomObject(), new CustomObject() }) } },
new object[] { new[] { new CancelInvocationMessage("xyz") } },
new object[] { new[] { PingMessage.Instance } },
new object[]
{
@ -59,8 +61,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
new InvocationMessage("xyz", /*nonBlocking*/ true, "method", null, 42, "string", new CustomObject()),
new CompletionMessage("xyz", error: null, result: 42, hasResult: true),
new StreamItemMessage("xyz", null),
PingMessage.Instance,
new StreamInvocationMessage("xyz", "method", null, 42, "string", new CustomObject()),
new CompletionMessage("xyz", error: null, result: new CustomObject(), hasResult: true)
new CompletionMessage("xyz", error: null, result: new CustomObject(), hasResult: true),
}
}
};
@ -273,12 +276,22 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
{
0x04, 0x92, 0x05, 0xa1, 0x30
}
}
},
new object[]
{
PingMessage.Instance,
new byte[]
{
0x02,
0x91, // message array length = 1 (fixarray)
0x06, // type = 6 = Ping (fixnum)
}
},
};
[Theory]
[MemberData(nameof(MessageAndPayload))]
public void UserObjectAreSerializedAsMaps(HubMessage message, byte[] expectedPayload)
public void SerializeMessageTest(HubMessage message, byte[] expectedPayload)
{
using (var memoryStream = new MemoryStream())
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -16,47 +16,60 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
public bool Equals(HubMessage x, HubMessage y)
{
if (!string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal))
// Types should be equal
if (!Equals(x.GetType(), y.GetType()))
{
return false;
}
return InvocationMessagesEqual(x, y) || StreamItemMessagesEqual(x, y) || CompletionMessagesEqual(x, y)
|| StreamInvocationMessagesEqual(x, y) || CancelInvocationMessagesEqual(x, y);
switch (x)
{
case InvocationMessage invocationMessage:
return InvocationMessagesEqual(invocationMessage, (InvocationMessage)y);
case StreamItemMessage streamItemMessage:
return StreamItemMessagesEqual(streamItemMessage, (StreamItemMessage)y);
case CompletionMessage completionMessage:
return CompletionMessagesEqual(completionMessage, (CompletionMessage)y);
case StreamInvocationMessage streamInvocationMessage:
return StreamInvocationMessagesEqual(streamInvocationMessage, (StreamInvocationMessage)y);
case CancelInvocationMessage cancelItemMessage:
return string.Equals(cancelItemMessage.InvocationId, ((CancelInvocationMessage)y).InvocationId, StringComparison.Ordinal);
case PingMessage pingMessage:
// If the types are equal (above), then we're done.
return true;
default:
throw new InvalidOperationException($"Unknown message type: {x.GetType().FullName}");
}
}
private bool CompletionMessagesEqual(HubMessage x, HubMessage y)
private bool CompletionMessagesEqual(CompletionMessage x, CompletionMessage y)
{
return x is CompletionMessage left && y is CompletionMessage right &&
string.Equals(left.Error, right.Error, StringComparison.Ordinal) &&
left.HasResult == right.HasResult &&
(Equals(left.Result, right.Result) || SequenceEqual(left.Result, right.Result));
return string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal) &&
string.Equals(x.Error, y.Error, StringComparison.Ordinal) &&
x.HasResult == y.HasResult &&
(Equals(x.Result, y.Result) || SequenceEqual(x.Result, y.Result));
}
private bool StreamItemMessagesEqual(HubMessage x, HubMessage y)
private bool StreamItemMessagesEqual(StreamItemMessage x, StreamItemMessage y)
{
return x is StreamItemMessage left && y is StreamItemMessage right &&
(Equals(left.Item, right.Item) || SequenceEqual(left.Item, right.Item));
return string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal) &&
(Equals(x.Item, y.Item) || SequenceEqual(x.Item, y.Item));
}
private bool InvocationMessagesEqual(HubMessage x, HubMessage y)
private bool InvocationMessagesEqual(InvocationMessage x, InvocationMessage y)
{
return x is InvocationMessage left && y is InvocationMessage right &&
string.Equals(left.Target, right.Target, StringComparison.Ordinal) &&
ArgumentListsEqual(left.Arguments, right.Arguments) &&
left.NonBlocking == right.NonBlocking;
return string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal) &&
string.Equals(x.Target, y.Target, StringComparison.Ordinal) &&
ArgumentListsEqual(x.Arguments, y.Arguments) &&
x.NonBlocking == y.NonBlocking;
}
private bool StreamInvocationMessagesEqual(HubMessage x, HubMessage y)
private bool StreamInvocationMessagesEqual(StreamInvocationMessage x, StreamInvocationMessage y)
{
return x is StreamInvocationMessage left && y is StreamInvocationMessage right &&
string.Equals(left.Target, right.Target, StringComparison.Ordinal) &&
ArgumentListsEqual(left.Arguments, right.Arguments) &&
left.NonBlocking == right.NonBlocking;
}
private bool CancelInvocationMessagesEqual(HubMessage x, HubMessage y)
{
return x is CancelInvocationMessage && y is CancelInvocationMessage;
return string.Equals(x.InvocationId, y.InvocationId, StringComparison.Ordinal) &&
string.Equals(x.Target, y.Target, StringComparison.Ordinal) &&
ArgumentListsEqual(x.Arguments, y.Arguments) &&
x.NonBlocking == y.NonBlocking;
}
private bool ArgumentListsEqual(object[] left, object[] right)

View File

@ -7,8 +7,8 @@ using System.Linq;
using System.Runtime.Serialization;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR.Internal;
@ -1309,6 +1309,30 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task AcceptsPingMessages()
{
var serviceProvider = CreateServiceProvider();
var endPoint = serviceProvider.GetService<HubEndPoint<MethodHub>>();
using (var client = new TestClient(false, new JsonHubProtocol()))
{
var endPointLifetime = endPoint.OnConnectedAsync(client.Connection).OrTimeout();
await client.Connected.OrTimeout();
// Send a ping
await client.SendHubMessageAsync(PingMessage.Instance).OrTimeout();
// Now do an invocation to make sure we processed the ping message
var completion = await client.InvokeAsync(nameof(MethodHub.ValueMethod)).OrTimeout();
Assert.NotNull(completion);
client.Dispose();
await endPointLifetime.OrTimeout();
}
}
private static void AssertHubMessage(HubMessage expected, HubMessage actual)
{
// We aren't testing InvocationIds here