Integrating new transport protocols (#257)
This commit is contained in:
parent
080494fc78
commit
78dfd278c1
|
|
@ -2,13 +2,15 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Sockets;
|
||||
using Microsoft.AspNetCore.Sockets.Client;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.AspNetCore.Sockets;
|
||||
|
||||
namespace ClientSample
|
||||
{
|
||||
|
|
@ -16,6 +18,14 @@ namespace ClientSample
|
|||
{
|
||||
public static async Task MainAsync(string[] args)
|
||||
{
|
||||
if(args.Contains("--debug"))
|
||||
{
|
||||
Console.WriteLine($"Ready for debugger to attach. Process ID: {Process.GetCurrentProcess().Id}");
|
||||
Console.Write("Press ENTER to Continue");
|
||||
Console.ReadLine();
|
||||
args = args.Except(new[] { "--debug" }).ToArray();
|
||||
}
|
||||
|
||||
var baseUrl = "http://localhost:5000/chat";
|
||||
if (args.Length > 0)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -24,4 +24,4 @@
|
|||
DestinationFolder="$(MSBuildThisFileDirectory)wwwroot\lib\signalr-client" />
|
||||
</Target>
|
||||
|
||||
</Project>
|
||||
</Project>
|
||||
|
|
|
|||
|
|
@ -7,15 +7,15 @@
|
|||
<body>
|
||||
<h1>ASP.NET Sockets</h1>
|
||||
<ul>
|
||||
<li><a href="sockets.html?transport=serverSentEvents">Server Sent Events</a></li>
|
||||
<li><a href="sockets.html?transport=longPolling">Long polling</a></li>
|
||||
<li><a href="sockets.html?transport=webSocket">Web Sockets</a></li>
|
||||
<li><a href="ws.html">"Plain" Web Sockets</a></li>
|
||||
<li><a href="sockets.html?transport=serverSentEvents">Server Sent Events</a></li>
|
||||
<li><a href="sockets.html?transport=webSockets">Web Sockets</a></li>
|
||||
<li><a href="ws.html">Web Sockets (using only Browser APIs)</a></li>
|
||||
</ul>
|
||||
<h1>ASP.NET SignalR (Hubs)</h1>
|
||||
<ul>
|
||||
<li><a href="hubs.html?transport=serverSentEvents">Server Sent Events</a></li>
|
||||
<li><a href="hubs.html?transport=longPolling">Long polling</a></li>
|
||||
<li><a href="hubs.html?transport=serverSentEvents">Server Sent Events</a></li>
|
||||
<li><a href="hubs.html?transport=webSockets">Web Sockets</a></li>
|
||||
</ul>
|
||||
</body>
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ If the `connectionId` parameter is missing, a `400 Bad Request` response is retu
|
|||
|
||||
### Text-based encoding (`supportsBinary` = `false` or not present)
|
||||
|
||||
The body will be formatted as below and encoded in UTF-8. The `Content-Type` response header is set to `application/vnd.microsoft.aspnet.endpoint-messages.v1+text`. Identifiers in square brackets `[]` indicate fields defined below, and parenthesis `()` indicate grouping.
|
||||
The body will be formatted as below and encoded in UTF-8. The `Content-Type` response header is set to `application/vnd.microsoft.aspnetcore.endpoint-messages.v1+text`. Identifiers in square brackets `[]` indicate fields defined below, and parenthesis `()` indicate grouping.
|
||||
|
||||
```
|
||||
T([Length]:[Type]:[Body];)([Length]:[Type]:[Body];)... continues until end of the response body ...
|
||||
|
|
@ -164,7 +164,7 @@ This transport will buffer incomplete frames sent by the server until the full m
|
|||
|
||||
In JavaScript/Browser clients, this encoding requires XHR2 (or similar HTTP request functionality which allows binary data) and TypedArray support.
|
||||
|
||||
The body is encoded as follows. The `Content-Type` response header is set to `application/vnd.microsoft.aspnet.endpoint-messages.v1+binary`. Identifiers in square brackets `[]` indicate fields defined below, and parenthesis `()` indicate grouping. Other symbols indicate ASCII-encoded text in the stream
|
||||
The body is encoded as follows. The `Content-Type` response header is set to `application/vnd.microsoft.aspnetcore.endpoint-messages.v1+binary`. Identifiers in square brackets `[]` indicate fields defined below, and parenthesis `()` indicate grouping. Other symbols indicate ASCII-encoded text in the stream
|
||||
|
||||
```
|
||||
B([Length][Type][Body])([Length][Type][Fin][Body])... continues until end of the response body ...
|
||||
|
|
|
|||
|
|
@ -0,0 +1,130 @@
|
|||
import { Message, MessageType } from './Message';
|
||||
|
||||
let knownTypes = {
|
||||
"T": MessageType.Text,
|
||||
"B": MessageType.Binary,
|
||||
"C": MessageType.Close,
|
||||
"E": MessageType.Error
|
||||
};
|
||||
|
||||
function splitAt(input: string, searchString: string, position: number): [string, number] {
|
||||
let index = input.indexOf(searchString, position);
|
||||
if (index < 0) {
|
||||
return [input.substr(position), input.length];
|
||||
}
|
||||
let left = input.substring(position, index);
|
||||
return [left, index + searchString.length];
|
||||
}
|
||||
|
||||
export namespace ServerSentEventsFormat {
|
||||
export function parse(input: string): Message {
|
||||
// The SSE protocol is pretty simple. We just look at the first line for the type, and then process the remainder.
|
||||
// Binary messages require Base64-decoding and ArrayBuffer support, just like in the other formats below
|
||||
|
||||
if (input.length == 0) {
|
||||
throw "Message is missing header";
|
||||
}
|
||||
|
||||
let [header, offset] = splitAt(input, "\n", 0);
|
||||
let payload = input.substring(offset);
|
||||
|
||||
// Just in case the header used CRLF as the line separator, carve it off
|
||||
if (header.endsWith('\r')) {
|
||||
header = header.substr(0, header.length - 1);
|
||||
}
|
||||
|
||||
// Parse the header
|
||||
var messageType = knownTypes[header];
|
||||
if (messageType === undefined) {
|
||||
throw "Unknown type value: '" + header + "'";
|
||||
}
|
||||
|
||||
if (messageType == MessageType.Binary) {
|
||||
// We need to decode and put in an ArrayBuffer. Throw for now
|
||||
// This will require our own Base64-decoder because the browser
|
||||
// built-in one only decodes to strings and throws if invalid UTF-8
|
||||
// characters are found.
|
||||
throw "TODO: Support for binary messages";
|
||||
}
|
||||
|
||||
// Create the message
|
||||
return new Message(messageType, payload);
|
||||
}
|
||||
}
|
||||
|
||||
export namespace TextMessageFormat {
|
||||
const InvalidPayloadError = new Error("Invalid text message payload");
|
||||
const LengthRegex = /^[0-9]+$/;
|
||||
|
||||
function hasSpace(input: string, offset: number, length: number): boolean {
|
||||
let requiredLength = offset + length;
|
||||
return input.length >= requiredLength;
|
||||
}
|
||||
|
||||
function parseMessage(input: string, position: number): [number, Message] {
|
||||
var offset = position;
|
||||
|
||||
// Read the length
|
||||
var [lenStr, offset] = splitAt(input, ":", offset);
|
||||
|
||||
// parseInt is too leniant, we need a strict check to see if the string is an int
|
||||
|
||||
if (!LengthRegex.test(lenStr)) {
|
||||
throw `Invalid length: '${lenStr}'`;
|
||||
}
|
||||
let length = Number.parseInt(lenStr);
|
||||
|
||||
// Required space is: 3 (type flag, ":", ";") + length (payload len)
|
||||
if (!hasSpace(input, offset, 3 + length)) {
|
||||
throw "Message is incomplete";
|
||||
}
|
||||
|
||||
// Read the type
|
||||
var [typeStr, offset] = splitAt(input, ":", offset);
|
||||
|
||||
// Parse the type
|
||||
var messageType = knownTypes[typeStr];
|
||||
if (messageType === undefined) {
|
||||
throw "Unknown type value: '" + typeStr + "'";
|
||||
}
|
||||
|
||||
// Read the payload
|
||||
var payload = input.substr(offset, length);
|
||||
offset += length;
|
||||
|
||||
// Verify the final trailing character
|
||||
if (input[offset] != ';') {
|
||||
throw "Message missing trailer character";
|
||||
}
|
||||
offset += 1;
|
||||
|
||||
if (messageType == MessageType.Binary) {
|
||||
// We need to decode and put in an ArrayBuffer. Throw for now
|
||||
// This will require our own Base64-decoder because the browser
|
||||
// built-in one only decodes to strings and throws if invalid UTF-8
|
||||
// characters are found.
|
||||
throw "TODO: Support for binary messages";
|
||||
}
|
||||
|
||||
return [offset, new Message(messageType, payload)];
|
||||
}
|
||||
|
||||
export function parse(input: string): Message[] {
|
||||
if (input.length == 0) {
|
||||
return []
|
||||
}
|
||||
|
||||
if (input[0] != 'T') {
|
||||
throw `Unsupported message format: '${input[0]}'`;
|
||||
}
|
||||
|
||||
let messages = [];
|
||||
var offset = 1;
|
||||
while (offset < input.length) {
|
||||
let message;
|
||||
[offset, message] = parseMessage(input, offset);
|
||||
messages.push(message);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
export enum MessageType {
|
||||
Text,
|
||||
Binary,
|
||||
Close,
|
||||
Error
|
||||
}
|
||||
|
||||
export class Message {
|
||||
public type: MessageType;
|
||||
public content: ArrayBuffer | string;
|
||||
|
||||
constructor(type: MessageType, content: ArrayBuffer | string) {
|
||||
this.type = type;
|
||||
this.content = content;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
import { DataReceived, ErrorHandler } from "./Common"
|
||||
import { IHttpClient } from "./HttpClient"
|
||||
import * as Formatters from "./Formatters";
|
||||
|
||||
export interface ITransport {
|
||||
connect(url: string, queryString: string): Promise<void>;
|
||||
|
|
@ -73,7 +74,7 @@ export class ServerSentEventsTransport implements ITransport {
|
|||
private queryString: string;
|
||||
private httpClient: IHttpClient;
|
||||
|
||||
constructor(httpClient :IHttpClient) {
|
||||
constructor(httpClient: IHttpClient) {
|
||||
this.httpClient = httpClient;
|
||||
}
|
||||
|
||||
|
|
@ -92,7 +93,19 @@ export class ServerSentEventsTransport implements ITransport {
|
|||
try {
|
||||
eventSource.onmessage = (e: MessageEvent) => {
|
||||
if (this.onDataReceived) {
|
||||
this.onDataReceived(e.data);
|
||||
// Parse the message
|
||||
let message;
|
||||
try {
|
||||
message = Formatters.ServerSentEventsFormat.parse(e.data);
|
||||
} catch (error) {
|
||||
if (this.onError) {
|
||||
this.onError(error);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: pass the whole message object along
|
||||
this.onDataReceived(message.content);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -138,7 +151,7 @@ export class LongPollingTransport implements ITransport {
|
|||
private pollXhr: XMLHttpRequest;
|
||||
private shouldPoll: boolean;
|
||||
|
||||
constructor(httpClient :IHttpClient) {
|
||||
constructor(httpClient: IHttpClient) {
|
||||
this.httpClient = httpClient;
|
||||
}
|
||||
|
||||
|
|
@ -160,7 +173,21 @@ export class LongPollingTransport implements ITransport {
|
|||
pollXhr.onload = () => {
|
||||
if (pollXhr.status == 200) {
|
||||
if (this.onDataReceived) {
|
||||
this.onDataReceived(pollXhr.response);
|
||||
// Parse the messages
|
||||
let messages;
|
||||
try {
|
||||
messages = Formatters.TextMessageFormat.parse(pollXhr.response);
|
||||
} catch (error) {
|
||||
if (this.onError) {
|
||||
this.onError(error);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
messages.forEach((message) => {
|
||||
// TODO: pass the whole message object along
|
||||
this.onDataReceived(message.content)
|
||||
});
|
||||
}
|
||||
this.poll(url);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,14 +2,15 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Sockets.Client.Internal;
|
||||
using Microsoft.AspNetCore.Sockets.Formatters;
|
||||
using Microsoft.Extensions.Internal;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
|
|
@ -84,15 +85,17 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
}
|
||||
else
|
||||
{
|
||||
var ms = new MemoryStream();
|
||||
await response.Content.CopyToAsync(ms);
|
||||
var message = new Message(ms.ToArray(), MessageType.Text);
|
||||
// Read the whole payload
|
||||
var payload = await response.Content.ReadAsByteArrayAsync();
|
||||
|
||||
while (await _application.Output.WaitToWriteAsync(cancellationToken))
|
||||
foreach (var message in ReadMessages(payload))
|
||||
{
|
||||
if (_application.Output.TryWrite(message))
|
||||
while (!_application.Output.TryWrite(message))
|
||||
{
|
||||
break;
|
||||
if (cancellationToken.IsCancellationRequested || !await _application.Output.WaitToWriteAsync(cancellationToken))
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -114,6 +117,28 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
}
|
||||
}
|
||||
|
||||
private IEnumerable<Message> ReadMessages(ReadOnlySpan<byte> payload)
|
||||
{
|
||||
if (payload.Length == 0)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
var messageFormat = MessageFormatter.GetFormat(payload[0]);
|
||||
payload = payload.Slice(1);
|
||||
|
||||
while (payload.Length > 0)
|
||||
{
|
||||
if (!MessageFormatter.TryParseMessage(payload, messageFormat, out var message, out var consumed))
|
||||
{
|
||||
throw new InvalidDataException("Invalid message payload from server");
|
||||
}
|
||||
|
||||
payload = payload.Slice(consumed);
|
||||
yield return message;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendMessages(Uri sendUrl, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
|
|
|
|||
|
|
@ -7,14 +7,20 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
|
|||
{
|
||||
public static class MessageFormatter
|
||||
{
|
||||
public static readonly byte TextFormatIndicator = (byte)'T';
|
||||
public static readonly byte BinaryFormatIndicator = (byte)'B';
|
||||
|
||||
public static readonly string TextContentType = "application/vnd.microsoft.aspnetcore.endpoint-messages.v1+text";
|
||||
public static readonly string BinaryContentType = "application/vnd.microsoft.aspnetcore.endpoint-messages.v1+binary";
|
||||
|
||||
public static bool TryFormatMessage(Message message, Span<byte> buffer, MessageFormat format, out int bytesWritten)
|
||||
{
|
||||
if (!message.EndOfMessage)
|
||||
{
|
||||
// This is a truely exceptional condition since we EXPECT callers to have already
|
||||
// This is truly an exceptional condition since we EXPECT callers to have already
|
||||
// buffered incomplete messages and synthesized the correct, complete message before
|
||||
// giving it to us. Hence we throw, instead of returning false.
|
||||
throw new InvalidOperationException("Cannot format message where endOfMessage is false using this format");
|
||||
throw new ArgumentException("Cannot format message where endOfMessage is false using this format", nameof(message));
|
||||
}
|
||||
|
||||
return format == MessageFormat.Text ?
|
||||
|
|
@ -28,5 +34,46 @@ namespace Microsoft.AspNetCore.Sockets.Formatters
|
|||
TextMessageFormatter.TryParseMessage(buffer, out message, out bytesConsumed) :
|
||||
BinaryMessageFormatter.TryParseMessage(buffer, out message, out bytesConsumed);
|
||||
}
|
||||
|
||||
public static string GetContentType(MessageFormat messageFormat)
|
||||
{
|
||||
switch (messageFormat)
|
||||
{
|
||||
case MessageFormat.Text: return TextContentType;
|
||||
case MessageFormat.Binary: return BinaryContentType;
|
||||
default: throw new ArgumentException($"Invalid message format: {messageFormat}", nameof(messageFormat));
|
||||
}
|
||||
}
|
||||
|
||||
public static byte GetFormatIndicator(MessageFormat messageFormat)
|
||||
{
|
||||
switch (messageFormat)
|
||||
{
|
||||
case MessageFormat.Text: return TextFormatIndicator;
|
||||
case MessageFormat.Binary: return BinaryFormatIndicator;
|
||||
default: throw new ArgumentException($"Invalid message format: {messageFormat}", nameof(messageFormat));
|
||||
}
|
||||
}
|
||||
|
||||
public static MessageFormat GetFormat(byte formatIndicator)
|
||||
{
|
||||
// Can't use switch because our "constants" are not consts, they're "static readonly" (which is good, because they are public)
|
||||
if (formatIndicator == TextFormatIndicator)
|
||||
{
|
||||
return MessageFormat.Text;
|
||||
}
|
||||
|
||||
if (formatIndicator == BinaryFormatIndicator)
|
||||
{
|
||||
return MessageFormat.Binary;
|
||||
}
|
||||
|
||||
throw new ArgumentException($"Invalid message format: 0x{formatIndicator:X}", nameof(formatIndicator));
|
||||
}
|
||||
|
||||
public static bool TryParseMessage(ReadOnlySpan<byte> payload, object messageFormat)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,12 +7,16 @@ using System.Threading;
|
|||
using System.Threading.Tasks;
|
||||
using System.Threading.Tasks.Channels;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Sockets.Formatters;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets.Transports
|
||||
{
|
||||
public class LongPollingTransport : IHttpTransport
|
||||
{
|
||||
// REVIEW: This size?
|
||||
internal const int MaxBufferSize = 4096;
|
||||
|
||||
public static readonly string Name = "longPolling";
|
||||
private readonly ReadableChannel<Message> _application;
|
||||
private readonly ILogger _logger;
|
||||
|
|
@ -34,13 +38,44 @@ namespace Microsoft.AspNetCore.Sockets.Transports
|
|||
return;
|
||||
}
|
||||
|
||||
Message message;
|
||||
if (_application.TryRead(out message))
|
||||
// TODO: Add support for binary protocol
|
||||
var messageFormat = MessageFormat.Text;
|
||||
context.Response.ContentType = MessageFormatter.GetContentType(messageFormat);
|
||||
|
||||
var writer = context.Response.Body.AsPipelineWriter();
|
||||
var alloc = writer.Alloc(minimumSize: 1);
|
||||
alloc.WriteBigEndian(MessageFormatter.GetFormatIndicator(messageFormat));
|
||||
|
||||
while (_application.TryRead(out var message))
|
||||
{
|
||||
var buffer = alloc.Memory.Span;
|
||||
|
||||
_logger.LogDebug("Writing {0} byte message to response", message.Payload.Length);
|
||||
context.Response.ContentLength = message.Payload.Length;
|
||||
await context.Response.Body.WriteAsync(message.Payload, 0, message.Payload.Length);
|
||||
|
||||
// Try to format the message
|
||||
if (!MessageFormatter.TryFormatMessage(message, buffer, messageFormat, out var written))
|
||||
{
|
||||
// We need to expand the buffer
|
||||
// REVIEW: I'm not sure I fully understand the "right" pattern here...
|
||||
alloc.Ensure(MaxBufferSize);
|
||||
buffer = alloc.Memory.Span;
|
||||
|
||||
// Try one more time
|
||||
if (!MessageFormatter.TryFormatMessage(message, buffer, messageFormat, out written))
|
||||
{
|
||||
// Message too large
|
||||
throw new InvalidOperationException($"Message is too large to write. Maximum allowed message size is: {MaxBufferSize}");
|
||||
}
|
||||
}
|
||||
|
||||
// Update the buffer and commit
|
||||
alloc.Advance(written);
|
||||
alloc.Commit();
|
||||
alloc = writer.Alloc();
|
||||
buffer = alloc.Memory.Span;
|
||||
}
|
||||
|
||||
await alloc.FlushAsync();
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -2,10 +2,12 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Threading.Tasks.Channels;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Sockets.Formatters;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets.Transports
|
||||
|
|
@ -27,16 +29,37 @@ namespace Microsoft.AspNetCore.Sockets.Transports
|
|||
context.Response.ContentType = "text/event-stream";
|
||||
context.Response.Headers["Cache-Control"] = "no-cache";
|
||||
context.Response.Headers["Content-Encoding"] = "identity";
|
||||
|
||||
await context.Response.Body.FlushAsync();
|
||||
|
||||
var pipe = context.Response.Body.AsPipelineWriter();
|
||||
|
||||
try
|
||||
{
|
||||
while (await _application.WaitToReadAsync(token))
|
||||
{
|
||||
var buffer = pipe.Alloc();
|
||||
while (_application.TryRead(out var message))
|
||||
{
|
||||
await Send(context, message);
|
||||
if (!ServerSentEventsMessageFormatter.TryFormatMessage(message, buffer.Memory.Span, out var written))
|
||||
{
|
||||
// We need to expand the buffer
|
||||
// REVIEW: I'm not sure I fully understand the "right" pattern here...
|
||||
buffer.Ensure(LongPollingTransport.MaxBufferSize);
|
||||
|
||||
// Try one more time
|
||||
if (!ServerSentEventsMessageFormatter.TryFormatMessage(message, buffer.Memory.Span, out written))
|
||||
{
|
||||
// Message too large
|
||||
throw new InvalidOperationException($"Message is too large to write. Maximum allowed message size is: {LongPollingTransport.MaxBufferSize}");
|
||||
}
|
||||
}
|
||||
buffer.Advance(written);
|
||||
buffer.Commit();
|
||||
buffer = pipe.Alloc();
|
||||
}
|
||||
|
||||
await buffer.FlushAsync();
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
|
|
@ -44,26 +67,5 @@ namespace Microsoft.AspNetCore.Sockets.Transports
|
|||
// Closed connection
|
||||
}
|
||||
}
|
||||
|
||||
private async Task Send(HttpContext context, Message message)
|
||||
{
|
||||
// TODO: Pooled buffers
|
||||
// 8 = 6(data: ) + 2 (\n\n)
|
||||
_logger.LogDebug("Sending {0} byte message to Server-Sent Events client", message.Payload.Length);
|
||||
var buffer = new byte[8 + message.Payload.Length];
|
||||
var at = 0;
|
||||
buffer[at++] = (byte)'d';
|
||||
buffer[at++] = (byte)'a';
|
||||
buffer[at++] = (byte)'t';
|
||||
buffer[at++] = (byte)'a';
|
||||
buffer[at++] = (byte)':';
|
||||
buffer[at++] = (byte)' ';
|
||||
message.Payload.CopyTo(new Span<byte>(buffer, at, message.Payload.Length));
|
||||
at += message.Payload.Length;
|
||||
buffer[at++] = (byte)'\n';
|
||||
buffer[at++] = (byte)'\n';
|
||||
await context.Response.Body.WriteAsync(buffer, 0, at);
|
||||
await context.Response.Body.FlushAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,65 @@
|
|||
import { TextMessageFormat, ServerSentEventsFormat } from "../../src/Microsoft.AspNetCore.SignalR.Client.TS/Formatters"
|
||||
import { Message, MessageType } from "../../src/Microsoft.AspNetCore.SignalR.Client.TS/Message";
|
||||
|
||||
describe("Text Message Formatter", () => {
|
||||
it("should return empty array on empty input", () => {
|
||||
let messages = TextMessageFormat.parse("");
|
||||
expect(messages).toEqual([]);
|
||||
});
|
||||
([
|
||||
["T0:T:;", [new Message(MessageType.Text, "")]],
|
||||
["T0:C:;", [new Message(MessageType.Close, "")]],
|
||||
["T0:E:;", [new Message(MessageType.Error, "")]],
|
||||
["T5:T:Hello;", [new Message(MessageType.Text, "Hello")]],
|
||||
["T5:T:Hello;5:C:World;5:E:Error;", [new Message(MessageType.Text, "Hello"), new Message(MessageType.Close, "World"), new Message(MessageType.Error, "Error")]],
|
||||
] as [[string, Message[]]]).forEach(([payload, expected_messages]) => {
|
||||
it(`should parse '${payload}' correctly`, () => {
|
||||
let messages = TextMessageFormat.parse(payload);
|
||||
expect(messages).toEqual(expected_messages);
|
||||
})
|
||||
});
|
||||
|
||||
([
|
||||
["TABC", "Invalid length: 'ABC'"],
|
||||
["X1:T:A", "Unsupported message format: 'X'"],
|
||||
["T1:T:A;12ab34:", "Invalid length: '12ab34'"],
|
||||
["T1:T:A;1:asdf:", "Unknown type value: 'asdf'"],
|
||||
["T1:T:A;1::", "Message is incomplete"],
|
||||
["T1:T:A;1:AB:", "Message is incomplete"],
|
||||
["T1:T:A;5:T:A", "Message is incomplete"],
|
||||
["T1:T:A;5:T:AB", "Message is incomplete"],
|
||||
["T1:T:A;5:T:ABCDE", "Message is incomplete"],
|
||||
["T1:T:A;5:X:ABCDE", "Message is incomplete"],
|
||||
["T1:T:A;5:T:ABCDEF", "Message missing trailer character"],
|
||||
] as [[string, string]]).forEach(([payload, expected_error]) => {
|
||||
it(`should fail to parse '${payload}'`, () => {
|
||||
expect(() => TextMessageFormat.parse(payload)).toThrow(expected_error);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("Server-Sent Events Formatter", () => {
|
||||
([
|
||||
["", "Message is missing header"],
|
||||
["A", "Unknown type value: 'A'"],
|
||||
["BOO\r\nBlarg", "Unknown type value: 'BOO'"]
|
||||
] as [string, string][]).forEach(([payload, expected_error]) => {
|
||||
it(`should fail to parse '${payload}`, () => {
|
||||
expect(() => ServerSentEventsFormat.parse(payload)).toThrow(expected_error);
|
||||
});
|
||||
});
|
||||
|
||||
([
|
||||
["T\r\nTest", new Message(MessageType.Text, "Test")],
|
||||
["C\r\nTest", new Message(MessageType.Close, "Test")],
|
||||
["E\r\nTest", new Message(MessageType.Error, "Test")],
|
||||
["T", new Message(MessageType.Text, "")],
|
||||
["T\r\n", new Message(MessageType.Text, "")],
|
||||
["T\r\nFoo\r\nBar", new Message(MessageType.Text, "Foo\r\nBar")]
|
||||
] as [string, Message][]).forEach(([payload, expected_message]) => {
|
||||
it(`should parse '${payload}' correctly`, () => {
|
||||
let message = ServerSentEventsFormat.parse(payload);
|
||||
expect(message).toEqual(expected_message);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -364,7 +364,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
|
|||
var content = string.Empty;
|
||||
if (request.RequestUri.AbsolutePath.EndsWith("/poll"))
|
||||
{
|
||||
content = "42";
|
||||
content = "T2:T:42;";
|
||||
}
|
||||
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent(content) };
|
||||
});
|
||||
|
|
|
|||
|
|
@ -88,9 +88,10 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests
|
|||
public void WriteInvalidMessages()
|
||||
{
|
||||
var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false);
|
||||
var ex = Assert.Throws<InvalidOperationException>(() =>
|
||||
var ex = Assert.Throws<ArgumentException>(() =>
|
||||
MessageFormatter.TryFormatMessage(message, Span<byte>.Empty, MessageFormat.Binary, out var written));
|
||||
Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message);
|
||||
Assert.Equal($"Cannot format message where endOfMessage is false using this format{Environment.NewLine}Parameter name: message", ex.Message);
|
||||
Assert.Equal("message", ex.ParamName);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
|
|
|
|||
|
|
@ -75,9 +75,10 @@ namespace Microsoft.AspNetCore.Sockets.Formatters.Tests
|
|||
public void WriteInvalidMessages()
|
||||
{
|
||||
var message = new Message(new byte[0], MessageType.Binary, endOfMessage: false);
|
||||
var ex = Assert.Throws<InvalidOperationException>(() =>
|
||||
var ex = Assert.Throws<ArgumentException>(() =>
|
||||
MessageFormatter.TryFormatMessage(message, Span<byte>.Empty, MessageFormat.Text, out var written));
|
||||
Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message);
|
||||
Assert.Equal($"Cannot format message where endOfMessage is false using this format{Environment.NewLine}Parameter name: message", ex.Message);
|
||||
Assert.Equal("message", ex.ParamName);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
|
|
|
|||
|
|
@ -48,7 +48,37 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
await poll.ProcessRequestAsync(context, context.RequestAborted);
|
||||
|
||||
Assert.Equal(200, context.Response.StatusCode);
|
||||
Assert.Equal("Hello World", Encoding.UTF8.GetString(ms.ToArray()));
|
||||
Assert.Equal("T11:T:Hello World;", Encoding.UTF8.GetString(ms.ToArray()));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MultipleFramesSentAsSingleResponse()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<Message>();
|
||||
var context = new DefaultHttpContext();
|
||||
var poll = new LongPollingTransport(channel, new LoggerFactory());
|
||||
var ms = new MemoryStream();
|
||||
context.Response.Body = ms;
|
||||
|
||||
await channel.Out.WriteAsync(new Message(
|
||||
Encoding.UTF8.GetBytes("Hello"),
|
||||
MessageType.Text,
|
||||
endOfMessage: true));
|
||||
await channel.Out.WriteAsync(new Message(
|
||||
Encoding.UTF8.GetBytes(" "),
|
||||
MessageType.Text,
|
||||
endOfMessage: true));
|
||||
await channel.Out.WriteAsync(new Message(
|
||||
Encoding.UTF8.GetBytes("World"),
|
||||
MessageType.Text,
|
||||
endOfMessage: true));
|
||||
|
||||
Assert.True(channel.Out.TryComplete());
|
||||
|
||||
await poll.ProcessRequestAsync(context, context.RequestAborted);
|
||||
|
||||
Assert.Equal(200, context.Response.StatusCode);
|
||||
Assert.Equal("T5:T:Hello;1:T: ;5:T:World;", Encoding.UTF8.GetString(ms.ToArray()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,8 +30,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
Assert.Equal("no-cache", context.Response.Headers["Cache-Control"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SSEAddsAppropriateFraming()
|
||||
[Theory]
|
||||
[InlineData("Hello World", "data: T\r\ndata: Hello World\r\n\r\n")]
|
||||
[InlineData("Hello\nWorld", "data: T\r\ndata: Hello\r\ndata: World\r\n\r\n")]
|
||||
[InlineData("Hello\r\nWorld", "data: T\r\ndata: Hello\r\ndata: World\r\n\r\n")]
|
||||
public async Task SSEAddsAppropriateFraming(string message, string expected)
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<Message>();
|
||||
var context = new DefaultHttpContext();
|
||||
|
|
@ -40,7 +43,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
context.Response.Body = ms;
|
||||
|
||||
await channel.Out.WriteAsync(new Message(
|
||||
Encoding.UTF8.GetBytes("Hello World"),
|
||||
Encoding.UTF8.GetBytes(message),
|
||||
MessageType.Text,
|
||||
endOfMessage: true));
|
||||
|
||||
|
|
@ -48,7 +51,6 @@ namespace Microsoft.AspNetCore.Sockets.Tests
|
|||
|
||||
await sse.ProcessRequestAsync(context, context.RequestAborted);
|
||||
|
||||
var expected = "data: Hello World\n\n";
|
||||
Assert.Equal(expected, Encoding.UTF8.GetString(ms.ToArray()));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue