Text Protocol Formatter (#187)

This commit is contained in:
Andrew Stanton-Nurse 2017-02-14 16:00:51 -08:00 committed by GitHub
parent 9889ab1bd7
commit a728e1da41
35 changed files with 603 additions and 76 deletions

View File

@ -14,6 +14,7 @@ env:
global:
- DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true
- DOTNET_CLI_TELEMETRY_OPTOUT: 1
- SIGNALR_TESTS_VERBOSE: 1
mono:
- 4.0.5
python:

View File

@ -1,7 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26127.0
VisualStudioVersion = 15.0.26208.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-5398-4884-87E4-B816698CDE65}"
EndProject
@ -58,6 +57,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.Signal
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.Sockets.Common", "src\Microsoft.AspNetCore.Sockets.Common\Microsoft.AspNetCore.Sockets.Common.csproj", "{F3EFFD9F-DD85-48A2-9B11-83A133ECC099}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.Sockets.Common.Tests", "test\Microsoft.AspNetCore.Sockets.Common.Tests\Microsoft.AspNetCore.Sockets.Common.Tests.csproj", "{B0D32729-48AA-4841-B52A-2A61B60EED61}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -152,6 +153,10 @@ Global
{F3EFFD9F-DD85-48A2-9B11-83A133ECC099}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F3EFFD9F-DD85-48A2-9B11-83A133ECC099}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F3EFFD9F-DD85-48A2-9B11-83A133ECC099}.Release|Any CPU.Build.0 = Release|Any CPU
{B0D32729-48AA-4841-B52A-2A61B60EED61}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B0D32729-48AA-4841-B52A-2A61B60EED61}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B0D32729-48AA-4841-B52A-2A61B60EED61}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B0D32729-48AA-4841-B52A-2A61B60EED61}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -179,5 +184,6 @@ Global
{354335AB-CEE9-4434-A641-78058F6EFE56} = {DA69F624-5398-4884-87E4-B816698CDE65}
{455B68D2-C5B6-4BF4-A685-964B07AFAAF8} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
{F3EFFD9F-DD85-48A2-9B11-83A133ECC099} = {DA69F624-5398-4884-87E4-B816698CDE65}
{B0D32729-48AA-4841-B52A-2A61B60EED61} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
EndGlobalSection
EndGlobal

View File

@ -61,7 +61,7 @@ namespace ClientSample
var line = Console.ReadLine();
logger.LogInformation("Sending: {0}", line);
await connection.SendAsync(Encoding.UTF8.GetBytes("Hello World"), Format.Text);
await connection.SendAsync(Encoding.UTF8.GetBytes("Hello World"), MessageType.Text);
}
logger.LogInformation("Send loop terminated");
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -39,7 +39,7 @@ namespace SocialWeather
var ms = new MemoryStream();
await formatter.WriteAsync(data, ms);
var buffer = ReadableBuffer.Create(ms.ToArray()).Preserve();
await connection.Transport.Output.WriteAsync(new Message(buffer, Format.Binary, endOfMessage: true));
await connection.Transport.Output.WriteAsync(new Message(buffer, MessageType.Binary, endOfMessage: true));
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -26,7 +26,7 @@ namespace SocialWeather.Pipe
Weather weather = (Weather)(-1);
string zipCode = tokens.Length > 3 ? tokens[3] : string.Empty;
if(tokens.Length == 0 || !int.TryParse(tokens[0], out temperature))
if (tokens.Length == 0 || !int.TryParse(tokens[0], out temperature))
{
temperature = int.MinValue;
}

View File

@ -1,3 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: WeatherReport.proto
#pragma warning disable 1591, 0612, 3021

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
@ -29,7 +29,7 @@ namespace SocketsSample.EndPoints
using (message)
{
// We can avoid the copy here but we'll deal with that later
await Broadcast(message.Payload.Buffer, message.MessageFormat, message.EndOfMessage);
await Broadcast(message.Payload.Buffer, message.Type, message.EndOfMessage);
}
}
}
@ -44,10 +44,10 @@ namespace SocketsSample.EndPoints
private Task Broadcast(string text)
{
return Broadcast(ReadableBuffer.Create(Encoding.UTF8.GetBytes(text)), Format.Text, endOfMessage: true);
return Broadcast(ReadableBuffer.Create(Encoding.UTF8.GetBytes(text)), MessageType.Text, endOfMessage: true);
}
private Task Broadcast(ReadableBuffer payload, Format format, bool endOfMessage)
private Task Broadcast(ReadableBuffer payload, MessageType format, bool endOfMessage)
{
var tasks = new List<Task>(Connections.Count);

View File

@ -98,7 +98,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
_logger.LogInformation("Sending Invocation #{0}", descriptor.Id);
// TODO: Format.Text - who, where and when decides about the format of outgoing messages
await _connection.SendAsync(ms.ToArray(), Format.Text, cancellationToken);
await _connection.SendAsync(ms.ToArray(), MessageType.Text, cancellationToken);
_logger.LogInformation("Sending Invocation #{0} complete", descriptor.Id);
@ -134,7 +134,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
ReceiveData receiveData = new ReceiveData();
while (await _connection.ReceiveAsync(receiveData, cancellationToken))
{
var message
var message
= await _adapter.ReadMessageAsync(new MemoryStream(receiveData.Data), _binder, cancellationToken);
switch (message)

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -278,7 +278,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis
private async Task WriteAsync(Connection connection, byte[] data)
{
var buffer = ReadableBuffer.Create(data).Preserve();
var message = new Message(buffer, Format.Text, endOfMessage: true);
var message = new Message(buffer, MessageType.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -130,7 +130,7 @@ namespace Microsoft.AspNetCore.SignalR
await invocationAdapter.WriteMessageAsync(invocation, stream);
var buffer = ReadableBuffer.Create(stream.ToArray()).Preserve();
var message = new Message(buffer, Format.Text, endOfMessage: true);
var message = new Message(buffer, MessageType.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -233,7 +233,7 @@ namespace Microsoft.AspNetCore.SignalR
await invocationAdapter.WriteMessageAsync(result, outStream);
var buffer = ReadableBuffer.Create(outStream.ToArray()).Preserve();
var outMessage = new Message(buffer, Format.Text, endOfMessage: true);
var outMessage = new Message(buffer, MessageType.Text, endOfMessage: true);
while (await connection.Transport.Output.WaitToWriteAsync())
{

View File

@ -57,7 +57,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
using (message)
{
receiveData.Format = message.MessageFormat;
receiveData.MessageType = message.Type;
receiveData.Data = message.Payload.Buffer.ToArray();
return true;
}
@ -80,14 +80,14 @@ namespace Microsoft.AspNetCore.Sockets.Client
return false;
}
public Task<bool> SendAsync(byte[] data, Format format)
public Task<bool> SendAsync(byte[] data, MessageType type)
{
return SendAsync(data, format, CancellationToken.None);
return SendAsync(data, type, CancellationToken.None);
}
public async Task<bool> SendAsync(byte[] data, Format format, CancellationToken cancellationToken)
public async Task<bool> SendAsync(byte[] data, MessageType type, CancellationToken cancellationToken)
{
var message = new Message(ReadableBuffer.Create(data).Preserve(), format);
var message = new Message(ReadableBuffer.Create(data).Preserve(), type);
while (await Output.WaitToWriteAsync(cancellationToken))
{

View File

@ -82,7 +82,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
var ms = new MemoryStream();
await response.Content.CopyToAsync(ms);
var message = new Message(ReadableBuffer.Create(ms.ToArray()).Preserve(), Format.Text);
var message = new Message(ReadableBuffer.Create(ms.ToArray()).Preserve(), MessageType.Text);
while (await _application.Output.WaitToWriteAsync(cancellationToken))
{

View File

@ -7,6 +7,6 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
public byte[] Data { get; set; }
public Format Format { get; set; }
public MessageType MessageType { get; set; }
}
}

View File

@ -1,26 +1,28 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
using System.Text.Formatting;
namespace Microsoft.AspNetCore.Sockets
{
public struct Message : IDisposable
{
public bool EndOfMessage { get; }
public Format MessageFormat { get; }
public MessageType Type { get; }
public PreservedBuffer Payload { get; }
public Message(PreservedBuffer payload, Format messageFormat)
: this(payload, messageFormat, endOfMessage: true)
public Message(PreservedBuffer payload, MessageType type)
: this(payload, type, endOfMessage: true)
{
}
public Message(PreservedBuffer payload, Format messageFormat, bool endOfMessage)
public Message(PreservedBuffer payload, MessageType type, bool endOfMessage)
{
MessageFormat = messageFormat;
Type = type;
EndOfMessage = endOfMessage;
Payload = payload;
}

View File

@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.AspNetCore.Sockets
{
public enum MessageFormat
{
Text,
Binary
}
}

View File

@ -0,0 +1,31 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
namespace Microsoft.AspNetCore.Sockets
{
public static class MessageFormatter
{
public static bool TryFormatMessage(Message message, Span<byte> buffer, MessageFormat format, out int bytesWritten)
{
if (!message.EndOfMessage)
{
// This is a truely exceptional condition since we EXPECT callers to have already
// buffered incomplete messages and synthesized the correct, complete message before
// giving it to us. Hence we throw, instead of returning false.
throw new InvalidOperationException("Cannot format message where endOfMessage is false using this format");
}
return format == MessageFormat.Text ?
TextMessageFormatter.TryFormatMessage(message, buffer, out bytesWritten) :
throw new NotImplementedException();
}
public static bool TryParseMessage(ReadOnlySpan<byte> buffer, MessageFormat format, out Message message, out int bytesConsumed)
{
return format == MessageFormat.Text ?
TextMessageFormatter.TryParseMessage(buffer, out message, out bytesConsumed) :
throw new NotImplementedException();
}
}
}

View File

@ -1,11 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.Sockets
{
public enum Format
public enum MessageType
{
Text,
Binary
Binary,
Close,
Error
}
}

View File

@ -12,6 +12,7 @@
<ItemGroup>
<PackageReference Include="System.IO.Pipelines" Version="0.1.0-*" />
<PackageReference Include="System.Text.Primitives" Version="0.1.0-*" />
<PackageReference Include="System.Threading.Tasks.Channels" Version="0.1.0-*" />
</ItemGroup>

View File

@ -0,0 +1,248 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
namespace Microsoft.AspNetCore.Sockets
{
internal static class TextMessageFormatter
{
private const byte FieldDelimiter = (byte)':';
private const byte MessageDelimiter = (byte)';';
private const byte TextTypeFlag = (byte)'T';
private const byte BinaryTypeFlag = (byte)'B';
private const byte CloseTypeFlag = (byte)'C';
private const byte ErrorTypeFlag = (byte)'E';
public static bool TryFormatMessage(Message message, Span<byte> buffer, out int bytesWritten)
{
// Calculate the length, it's the number of characters for text messages, but number of base64 characters for binary
var length = message.Payload.Buffer.Length;
if (message.Type == MessageType.Binary)
{
length = (int)(4 * Math.Ceiling(((double)message.Payload.Buffer.Length / 3)));
}
// Write the length as a string
int written = 0;
if (!length.TryFormat(buffer, out int lengthLen, default(TextFormat), EncodingData.InvariantUtf8))
{
bytesWritten = 0;
return false;
}
written += lengthLen;
buffer = buffer.Slice(lengthLen);
// We need at least 4 more characters of space (':', type flag, ':', and eventually the terminating ';')
// We'll still need to double-check that we have space for the terminator after we write the payload,
// but this way we can exit early if the buffer is way too small.
if (buffer.Length < 4)
{
bytesWritten = 0;
return false;
}
buffer[0] = FieldDelimiter;
if (!TryFormatType(message.Type, buffer.Slice(1, 1)))
{
bytesWritten = 0;
return false;
}
buffer[2] = FieldDelimiter;
buffer = buffer.Slice(3);
written += 3;
// Payload
if (message.Type == MessageType.Binary)
{
// Encode the payload. For now, we make it an array and use the old-fashioned types because we need to mirror packages
// I've filed https://github.com/aspnet/SignalR/issues/192 to update this. -anurse
var payload = Convert.ToBase64String(message.Payload.Buffer.ToArray());
if (!TextEncoder.Utf8.TryEncodeString(payload, buffer, out int payloadWritten))
{
bytesWritten = 0;
return false;
}
written += payloadWritten;
buffer = buffer.Slice(payloadWritten);
}
else
{
if (buffer.Length < message.Payload.Buffer.Length)
{
bytesWritten = 0;
return false;
}
message.Payload.Buffer.CopyTo(buffer.Slice(0, message.Payload.Buffer.Length));
written += message.Payload.Buffer.Length;
buffer = buffer.Slice(message.Payload.Buffer.Length);
}
// Terminator
if (buffer.Length < 1)
{
bytesWritten = 0;
return false;
}
buffer[0] = MessageDelimiter;
bytesWritten = written + 1;
return true;
}
internal static bool TryParseMessage(ReadOnlySpan<byte> buffer, out Message message, out int bytesConsumed)
{
// Read until the first ':' to find the length
var consumedSoFar = 0;
var colonIndex = buffer.IndexOf(FieldDelimiter);
if (colonIndex < 0)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
consumedSoFar += colonIndex;
var lengthSpan = buffer.Slice(0, colonIndex);
buffer = buffer.Slice(colonIndex);
// Parse the length
if (!PrimitiveParser.TryParseInt32(lengthSpan, out var length, out var consumedByLength, EncodingData.InvariantUtf8) || consumedByLength < lengthSpan.Length)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
// Check if there's enough space in the buffer to even bother continuing
// There are at least 4 characters we still expect to see: ':', type flag, ':', ';'.
if (buffer.Length < 4)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
// Verify that we have the ':' after the type flag.
if (buffer[0] != FieldDelimiter)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
// We already know that index 0 is the ':', so next is the type flag at index '1'.
if (!TryParseType(buffer[1], out var messageType))
{
message = default(Message);
bytesConsumed = 0;
}
// Verify that we have the ':' after the type flag.
if (buffer[2] != FieldDelimiter)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
// Slice off ':[Type]:' and check the remaining length
buffer = buffer.Slice(3);
consumedSoFar += 3;
// We expect to see <length>+1 more characters. Since <length> is the exact number of bytes in the text (even if base64-encoded)
// and we expect to see the ';'
if (buffer.Length < length + 1)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
// Grab the payload buffer
var payloadBuffer = buffer.Slice(0, length);
buffer = buffer.Slice(length);
consumedSoFar += length;
// Parse the payload. For now, we make it an array and use the old-fashioned types.
// I've filed https://github.com/aspnet/SignalR/issues/192 to update this. -anurse
var payloadArray = payloadBuffer.ToArray();
PreservedBuffer payload;
if (messageType == MessageType.Binary)
{
byte[] decoded;
try
{
var str = Encoding.UTF8.GetString(payloadArray);
decoded = Convert.FromBase64String(str);
}
catch
{
// Decoding failure
message = default(Message);
bytesConsumed = 0;
return false;
}
payload = ReadableBuffer.Create(decoded).Preserve();
}
else
{
payload = ReadableBuffer.Create(payloadArray).Preserve();
}
// Verify the trailer
if (buffer.Length < 1 || buffer[0] != MessageDelimiter)
{
message = default(Message);
bytesConsumed = 0;
return false;
}
message = new Message(payload, messageType);
bytesConsumed = consumedSoFar + 1;
return true;
}
private static bool TryParseType(byte type, out MessageType messageType)
{
switch (type)
{
case TextTypeFlag:
messageType = MessageType.Text;
return true;
case BinaryTypeFlag:
messageType = MessageType.Binary;
return true;
case CloseTypeFlag:
messageType = MessageType.Close;
return true;
case ErrorTypeFlag:
messageType = MessageType.Error;
return true;
default:
messageType = default(MessageType);
return false;
}
}
private static bool TryFormatType(MessageType type, Span<byte> buffer)
{
switch (type)
{
case MessageType.Text:
buffer[0] = TextTypeFlag;
return true;
case MessageType.Binary:
buffer[0] = BinaryTypeFlag;
return true;
case MessageType.Close:
buffer[0] = CloseTypeFlag;
return true;
case MessageType.Error:
buffer[0] = ErrorTypeFlag;
return true;
default:
return false;
}
}
}
}

View File

@ -221,8 +221,8 @@ namespace Microsoft.AspNetCore.Sockets
{
var format =
string.Equals(context.Request.Query["format"], "binary", StringComparison.OrdinalIgnoreCase)
? Format.Binary
: Format.Text;
? MessageType.Binary
: MessageType.Text;
var state = _manager.CreateConnection();
state.Connection.User = context.User;
@ -327,8 +327,8 @@ namespace Microsoft.AspNetCore.Sockets
var format =
string.Equals(context.Request.Query["format"], "binary", StringComparison.OrdinalIgnoreCase)
? Format.Binary
: Format.Text;
? MessageType.Binary
: MessageType.Text;
var message = new Message(
ReadableBuffer.Create(buffer).Preserve(),

View File

@ -1,4 +1,7 @@
using System;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;

View File

@ -137,7 +137,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
}
// Create a Message for the frame
var message = new Message(frame.Payload.Preserve(), effectiveOpcode == WebSocketOpcode.Binary ? Format.Binary : Format.Text, frame.EndOfMessage);
var message = new Message(frame.Payload.Preserve(), effectiveOpcode == WebSocketOpcode.Binary ? MessageType.Binary : MessageType.Text, frame.EndOfMessage);
// Write the message to the channel
return _application.Output.WriteAsync(message);
@ -166,7 +166,7 @@ namespace Microsoft.AspNetCore.Sockets.Transports
{
try
{
var opcode = message.MessageFormat == Format.Binary ?
var opcode = message.Type == MessageType.Binary ?
WebSocketOpcode.Binary :
WebSocketOpcode.Text;

View File

@ -1,4 +1,7 @@
using System;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Tests.Common

View File

@ -66,7 +66,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var transport = new LongPollingTransport(httpClient, loggerFactory);
using (var connection = await ClientConnection.ConnectAsync(new Uri(baseUrl + "/echo"), transport, httpClient, loggerFactory))
{
await connection.SendAsync(Encoding.UTF8.GetBytes(message), Format.Text);
await connection.SendAsync(Encoding.UTF8.GetBytes(message), MessageType.Text);
var receiveData = new ReceiveData();

View File

@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
_loggerFactory.AddConsole();
}
if(Debugger.IsAttached)
if (Debugger.IsAttached)
{
_loggerFactory.AddDebug();
}
@ -67,10 +67,10 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var t = Task.Run(() => host.Start());
Console.WriteLine("Starting test server...");
lifetime = host.Services.GetRequiredService<IApplicationLifetime>();
if(!lifetime.ApplicationStarted.WaitHandle.WaitOne(TimeSpan.FromSeconds(1)))
if (!lifetime.ApplicationStarted.WaitHandle.WaitOne(TimeSpan.FromSeconds(1)))
{
// t probably faulted
if(t.IsFaulted)
if (t.IsFaulted)
{
throw t.Exception.InnerException;
}

View File

@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
stream);
var buffer = ReadableBuffer.Create(stream.ToArray()).Preserve();
await Application.Output.WriteAsync(new Message(buffer, Format.Binary, endOfMessage: true));
await Application.Output.WriteAsync(new Message(buffer, MessageType.Binary, endOfMessage: true));
}
public async Task<T> Read<T>() where T : InvocationMessage

View File

@ -1,4 +1,7 @@
using Microsoft.AspNetCore.SignalR.Tests;
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;

View File

@ -112,7 +112,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var connection = await Connection.ConnectAsync(new Uri("http://fakeuri.org/"), longPollingTransport, httpClient))
{
var data = new byte[] { 1, 1, 2, 3, 5, 8 };
await connection.SendAsync(data, Format.Binary);
await connection.SendAsync(data, MessageType.Binary);
Assert.Equal(data, await sendTcs.Task.OrTimeout());
}
@ -163,7 +163,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
using (var connection = await Connection.ConnectAsync(new Uri("http://fakeuri.org/"), longPollingTransport, httpClient))
{
await connection.StopAsync();
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, Format.Binary));
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
}
@ -217,7 +217,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
allowPollTcs.TrySetResult(null);
await Assert.ThrowsAsync<HttpRequestException>(async () => await receiveTask);
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, Format.Binary));
Assert.False(await connection.SendAsync(new byte[] { 1, 1, 3, 5, 8 }, MessageType.Binary));
}
}

View File

@ -0,0 +1,186 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using Xunit;
namespace Microsoft.AspNetCore.Sockets.Tests
{
public class MessageFormatterTests
{
[Fact]
public void WriteMultipleMessages()
{
const string expectedEncoding = "0:B:;14:T:Hello,\r\nWorld!;1:C:A;12:E:Server Error;";
var messages = new[]
{
CreateMessage(new byte[0]),
CreateMessage("Hello,\r\nWorld!",MessageType.Text),
CreateMessage("A", MessageType.Close),
CreateMessage("Server Error", MessageType.Error)
};
var array = new byte[256];
var buffer = array.Slice();
var totalConsumed = 0;
foreach (var message in messages)
{
Assert.True(MessageFormatter.TryFormatMessage(message, buffer, MessageFormat.Text, out var consumed));
buffer = buffer.Slice(consumed);
totalConsumed += consumed;
}
Assert.Equal(expectedEncoding, Encoding.UTF8.GetString(array, 0, totalConsumed));
}
[Theory]
[InlineData("0:B:;", new byte[0])]
[InlineData("8:B:q83vEg==;", new byte[] { 0xAB, 0xCD, 0xEF, 0x12 })]
public void WriteBinaryMessage(string encoded, byte[] payload)
{
var message = CreateMessage(payload);
var buffer = new byte[256];
Assert.True(MessageFormatter.TryFormatMessage(message, buffer, MessageFormat.Text, out var bytesWritten));
var encodedSpan = buffer.Slice(0, bytesWritten);
Assert.Equal(encoded, Encoding.UTF8.GetString(encodedSpan.ToArray()));
}
[Theory]
[InlineData("0:T:;", MessageType.Text, "")]
[InlineData("3:T:ABC;", MessageType.Text, "ABC")]
[InlineData("11:T:A\nR\rC\r\n;DEF;", MessageType.Text, "A\nR\rC\r\n;DEF")]
[InlineData("0:C:;", MessageType.Close, "")]
[InlineData("17:C:Connection Closed;", MessageType.Close, "Connection Closed")]
[InlineData("0:E:;", MessageType.Error, "")]
[InlineData("12:E:Server Error;", MessageType.Error, "Server Error")]
public void WriteTextMessage(string encoded, MessageType messageType, string payload)
{
var message = CreateMessage(payload, messageType);
var buffer = new byte[256];
Assert.True(MessageFormatter.TryFormatMessage(message, buffer, MessageFormat.Text, out var bytesWritten));
var encodedSpan = buffer.Slice(0, bytesWritten);
Assert.Equal(encoded, Encoding.UTF8.GetString(encodedSpan.ToArray()));
}
[Fact]
public void WriteInvalidMessages()
{
var message = new Message(ReadableBuffer.Create(new byte[0]).Preserve(), MessageType.Binary, endOfMessage: false);
var ex = Assert.Throws<InvalidOperationException>(() =>
MessageFormatter.TryFormatMessage(message, Span<byte>.Empty, MessageFormat.Text, out var written));
Assert.Equal("Cannot format message where endOfMessage is false using this format", ex.Message);
}
[Theory]
[InlineData("0:T:;", MessageType.Text, "")]
[InlineData("3:T:ABC;", MessageType.Text, "ABC")]
[InlineData("11:T:A\nR\rC\r\n;DEF;", MessageType.Text, "A\nR\rC\r\n;DEF")]
[InlineData("0:C:;", MessageType.Close, "")]
[InlineData("17:C:Connection Closed;", MessageType.Close, "Connection Closed")]
[InlineData("0:E:;", MessageType.Error, "")]
[InlineData("12:E:Server Error;", MessageType.Error, "Server Error")]
public void ReadTextMessage(string encoded, MessageType messageType, string payload)
{
var buffer = Encoding.UTF8.GetBytes(encoded);
Assert.True(MessageFormatter.TryParseMessage(buffer, MessageFormat.Text, out var message, out var consumed));
Assert.Equal(consumed, buffer.Length);
AssertMessage(message, messageType, payload);
}
[Theory]
[InlineData("0:B:;", new byte[0])]
[InlineData("8:B:q83vEg==;", new byte[] { 0xAB, 0xCD, 0xEF, 0x12 })]
public void ReadBinaryMessage(string encoded, byte[] payload)
{
var buffer = Encoding.UTF8.GetBytes(encoded);
Assert.True(MessageFormatter.TryParseMessage(buffer, MessageFormat.Text, out var message, out var consumed));
Assert.Equal(consumed, buffer.Length);
AssertMessage(message, MessageType.Binary, payload);
}
[Fact]
public void ReadMultipleMessages()
{
const string encoded = "0:B:;14:T:Hello,\r\nWorld!;1:C:A;12:E:Server Error;";
var buffer = (Span<byte>)Encoding.UTF8.GetBytes(encoded);
var messages = new List<Message>();
var consumedTotal = 0;
while (MessageFormatter.TryParseMessage(buffer, MessageFormat.Text, out var message, out var consumed))
{
messages.Add(message);
consumedTotal += consumed;
buffer = buffer.Slice(consumed);
}
Assert.Equal(consumedTotal, Encoding.UTF8.GetByteCount(encoded));
Assert.Equal(4, messages.Count);
AssertMessage(messages[0], MessageType.Binary, new byte[0]);
AssertMessage(messages[1], MessageType.Text, "Hello,\r\nWorld!");
AssertMessage(messages[2], MessageType.Close, "A");
AssertMessage(messages[3], MessageType.Error, "Server Error");
}
[Theory]
[InlineData("")]
[InlineData("ABC")]
[InlineData("1230450945")]
[InlineData("12ab34:")]
[InlineData("1:asdf")]
[InlineData("1::")]
[InlineData("1:AB:")]
[InlineData("5:T:A")]
[InlineData("5:T:ABCDE")]
[InlineData("5:T:ABCDEF")]
[InlineData("5:X:ABCDEF")]
[InlineData("1029348109238412903849023841290834901283409128349018239048102394:X:ABCDEF")]
public void ReadInvalidMessages(string encoded)
{
var buffer = Encoding.UTF8.GetBytes(encoded);
Assert.False(MessageFormatter.TryParseMessage(buffer, MessageFormat.Text, out var message, out var consumed));
Assert.Equal(0, consumed);
}
private static void AssertMessage(Message message, MessageType messageType, byte[] payload)
{
Assert.True(message.EndOfMessage);
Assert.Equal(messageType, message.Type);
Assert.Equal(payload, message.Payload.Buffer.ToArray());
}
private static void AssertMessage(Message message, MessageType messageType, string payload)
{
Assert.True(message.EndOfMessage);
Assert.Equal(messageType, message.Type);
Assert.Equal(payload, Encoding.UTF8.GetString(message.Payload.Buffer.ToArray()));
}
private static Message CreateMessage(byte[] payload, MessageType type = MessageType.Binary)
{
return new Message(
ReadableBuffer.Create(payload).Preserve(),
type,
endOfMessage: true);
}
private static Message CreateMessage(string payload, MessageType type)
{
return new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes(payload)).Preserve(),
type,
endOfMessage: true);
}
}
}

View File

@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netcoreapp1.1;net46</TargetFrameworks>
<!-- TODO remove when https://github.com/Microsoft/vstest/issues/428 is resolved -->
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<GenerateBindingRedirectsOutputType>true</GenerateBindingRedirectsOutputType>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.Sockets.Common\Microsoft.AspNetCore.Sockets.Common.csproj" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0-*" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0-*" />
<PackageReference Include="xunit" Version="2.2.0-*" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
</Project>

View File

@ -254,7 +254,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve();
// Write to the transport so the poll yields
await state.Connection.Transport.Output.WriteAsync(new Message(buffer, Format.Text, endOfMessage: true));
await state.Connection.Transport.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true));
await task;
@ -279,7 +279,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve();
// Write to the application
await state.Application.Output.WriteAsync(new Message(buffer, Format.Text, endOfMessage: true));
await state.Application.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true));
await task;
@ -304,7 +304,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve();
// Write to the application
await state.Application.Output.WriteAsync(new Message(buffer, Format.Text, endOfMessage: true));
await state.Application.Output.WriteAsync(new Message(buffer, MessageType.Text, endOfMessage: true));
await task;

View File

@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
await channel.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(),
Format.Text,
MessageType.Text,
endOfMessage: true));
Assert.True(channel.Out.TryComplete());

View File

@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
await channel.Out.WriteAsync(new Message(
ReadableBuffer.Create(Encoding.UTF8.GetBytes("Hello World")).Preserve(),
Format.Text,
MessageType.Text,
endOfMessage: true));
Assert.True(channel.Out.TryComplete());

View File

@ -18,9 +18,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public class WebSocketsTests
{
[Theory]
[InlineData(Format.Text, WebSocketOpcode.Text)]
[InlineData(Format.Binary, WebSocketOpcode.Binary)]
public async Task ReceivedFramesAreWrittenToChannel(Format format, WebSocketOpcode opcode)
[InlineData(MessageType.Text, WebSocketOpcode.Text)]
[InlineData(MessageType.Binary, WebSocketOpcode.Binary)]
public async Task ReceivedFramesAreWrittenToChannel(MessageType format, WebSocketOpcode opcode)
{
var transportToApplication = Channel.CreateUnbounded<Message>();
var applicationToTransport = Channel.CreateUnbounded<Message>();
@ -49,7 +49,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var message = await applicationSide.Input.In.ReadAsync())
{
Assert.True(message.EndOfMessage);
Assert.Equal(format, message.MessageFormat);
Assert.Equal(format, message.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload.Buffer.ToArray()));
}
@ -66,9 +66,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}
[Theory]
[InlineData(Format.Text, WebSocketOpcode.Text)]
[InlineData(Format.Binary, WebSocketOpcode.Binary)]
public async Task MultiFrameMessagesArePropagatedToTheChannel(Format format, WebSocketOpcode opcode)
[InlineData(MessageType.Text, WebSocketOpcode.Text)]
[InlineData(MessageType.Binary, WebSocketOpcode.Binary)]
public async Task MultiFrameMessagesArePropagatedToTheChannel(MessageType format, WebSocketOpcode opcode)
{
var transportToApplication = Channel.CreateUnbounded<Message>();
var applicationToTransport = Channel.CreateUnbounded<Message>();
@ -101,14 +101,14 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var message1 = await applicationSide.Input.In.ReadAsync())
{
Assert.False(message1.EndOfMessage);
Assert.Equal(format, message1.MessageFormat);
Assert.Equal(format, message1.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message1.Payload.Buffer.ToArray()));
}
using (var message2 = await applicationSide.Input.In.ReadAsync())
{
Assert.True(message2.EndOfMessage);
Assert.Equal(format, message2.MessageFormat);
Assert.Equal(format, message2.Type);
Assert.Equal("World", Encoding.UTF8.GetString(message2.Payload.Buffer.ToArray()));
}
@ -125,9 +125,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}
[Theory]
[InlineData(Format.Text, WebSocketOpcode.Text)]
[InlineData(Format.Binary, WebSocketOpcode.Binary)]
public async Task IncompleteMessagesAreWrittenAsMultiFrameWebSocketMessages(Format format, WebSocketOpcode opcode)
[InlineData(MessageType.Text, WebSocketOpcode.Text)]
[InlineData(MessageType.Binary, WebSocketOpcode.Binary)]
public async Task IncompleteMessagesAreWrittenAsMultiFrameWebSocketMessages(MessageType format, WebSocketOpcode opcode)
{
var transportToApplication = Channel.CreateUnbounded<Message>();
var applicationToTransport = Channel.CreateUnbounded<Message>();
@ -173,9 +173,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}
[Theory]
[InlineData(Format.Text, WebSocketOpcode.Text)]
[InlineData(Format.Binary, WebSocketOpcode.Binary)]
public async Task DataWrittenToOutputPipelineAreSentAsFrames(Format format, WebSocketOpcode opcode)
[InlineData(MessageType.Text, WebSocketOpcode.Text)]
[InlineData(MessageType.Binary, WebSocketOpcode.Binary)]
public async Task DataWrittenToOutputPipelineAreSentAsFrames(MessageType format, WebSocketOpcode opcode)
{
var transportToApplication = Channel.CreateUnbounded<Message>();
var applicationToTransport = Channel.CreateUnbounded<Message>();
@ -214,9 +214,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}
[Theory]
[InlineData(Format.Text, WebSocketOpcode.Text)]
[InlineData(Format.Binary, WebSocketOpcode.Binary)]
public async Task FrameReceivedAfterServerCloseSent(Format format, WebSocketOpcode opcode)
[InlineData(MessageType.Text, WebSocketOpcode.Text)]
[InlineData(MessageType.Binary, WebSocketOpcode.Binary)]
public async Task FrameReceivedAfterServerCloseSent(MessageType format, WebSocketOpcode opcode)
{
var transportToApplication = Channel.CreateUnbounded<Message>();
var applicationToTransport = Channel.CreateUnbounded<Message>();
@ -250,7 +250,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
using (var message = await applicationSide.Input.In.ReadAsync())
{
Assert.True(message.EndOfMessage);
Assert.Equal(format, message.MessageFormat);
Assert.Equal(format, message.Type);
Assert.Equal("Hello", Encoding.UTF8.GetString(message.Payload.Buffer.ToArray()));
}