Merge pull request #8 from anurse/anurse/websockets

move prototype WebSockets-over-Channels code in
This commit is contained in:
David Fowler 2016-10-17 16:21:07 -07:00 committed by GitHub
commit 79c1781ae3
24 changed files with 1722 additions and 0 deletions

2
.gitignore vendored
View File

@ -31,3 +31,5 @@ runtimes/
.testPublish/
launchSettings.json
*.tmp
*.nuget.props
*.nuget.targets

View File

@ -23,6 +23,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{6A35B453-5
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.AspNetCore.Sockets.Tests", "test\Microsoft.AspNetCore.Sockets.Tests\Microsoft.AspNetCore.Sockets.Tests.xproj", "{AAD719D5-5E31-4ED1-A60F-6EB92EFA66D9}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.Extensions.WebSockets.Internal", "src\Microsoft.Extensions.WebSockets.Internal\Microsoft.Extensions.WebSockets.Internal.xproj", "{5D9DA986-2EAB-4C6D-BF15-9A4BDD4DE775}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.Extensions.WebSockets.Internal.Tests", "test\Microsoft.Extensions.WebSockets.Internal.Tests\Microsoft.Extensions.WebSockets.Internal.Tests.xproj", "{A7050BAE-3DB9-4FB3-A49D-303201415B13}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -45,6 +49,14 @@ Global
{AAD719D5-5E31-4ED1-A60F-6EB92EFA66D9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AAD719D5-5E31-4ED1-A60F-6EB92EFA66D9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AAD719D5-5E31-4ED1-A60F-6EB92EFA66D9}.Release|Any CPU.Build.0 = Release|Any CPU
{5D9DA986-2EAB-4C6D-BF15-9A4BDD4DE775}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5D9DA986-2EAB-4C6D-BF15-9A4BDD4DE775}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D9DA986-2EAB-4C6D-BF15-9A4BDD4DE775}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D9DA986-2EAB-4C6D-BF15-9A4BDD4DE775}.Release|Any CPU.Build.0 = Release|Any CPU
{A7050BAE-3DB9-4FB3-A49D-303201415B13}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A7050BAE-3DB9-4FB3-A49D-303201415B13}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A7050BAE-3DB9-4FB3-A49D-303201415B13}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A7050BAE-3DB9-4FB3-A49D-303201415B13}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -54,5 +66,7 @@ Global
{1715EA8D-8E13-4ACF-8BCA-57D048E55ED8} = {DA69F624-5398-4884-87E4-B816698CDE65}
{BA99C2A1-48F9-4FA5-B95A-9687A73B7CC9} = {C4BC9889-B49F-41B6-806B-F84941B2549B}
{AAD719D5-5E31-4ED1-A60F-6EB92EFA66D9} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
{5D9DA986-2EAB-4C6D-BF15-9A4BDD4DE775} = {DA69F624-5398-4884-87E4-B816698CDE65}
{A7050BAE-3DB9-4FB3-A49D-303201415B13} = {6A35B453-52EC-48AF-89CA-D4A69800F131}
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,50 @@
using System.Threading;
using System.Threading.Tasks;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal
{
public static class ChannelExtensions
{
public static ValueTask<ChannelReadResult> ReadAtLeastAsync(this IReadableChannel input, int minimumRequiredBytes) => ReadAtLeastAsync(input, minimumRequiredBytes, CancellationToken.None);
// TODO: Pull this up to Channels. We should be able to do it there without allocating a Task<T> in any case (rather than here where we can avoid allocation
// only if the buffer is already ready and has enough data.
public static ValueTask<ChannelReadResult> ReadAtLeastAsync(this IReadableChannel input, int minimumRequiredBytes, CancellationToken cancellationToken)
{
var awaiter = input.ReadAsync(/* cancellationToken */);
// Short-cut path!
if (awaiter.IsCompleted)
{
// We have a buffer, is it big enough?
var result = awaiter.GetResult();
if (result.IsCompleted || result.Buffer.Length >= minimumRequiredBytes)
{
return new ValueTask<ChannelReadResult>(result);
}
// Buffer wasn't big enough, mark it as examined and continue to the "slow" path below
input.Advance(
consumed: result.Buffer.Start,
examined: result.Buffer.End);
}
return new ValueTask<ChannelReadResult>(ReadAtLeastSlowAsync(awaiter, input, minimumRequiredBytes, cancellationToken));
}
private static async Task<ChannelReadResult> ReadAtLeastSlowAsync(ReadableChannelAwaitable awaitable, IReadableChannel input, int minimumRequiredBytes, CancellationToken cancellationToken)
{
var result = await awaitable;
while (!result.IsCompleted && result.Buffer.Length < minimumRequiredBytes)
{
cancellationToken.ThrowIfCancellationRequested();
input.Advance(
consumed: result.Buffer.Start,
examined: result.Buffer.End);
result = await input.ReadAsync(/* cancelToken */);
}
return result;
}
}
}

View File

@ -0,0 +1,104 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Extensions.WebSockets.Internal
{
/// <summary>
/// Represents a connection to a WebSocket endpoint.
/// </summary>
/// <remarks>
/// <para>
/// Implementors of this type are generally considered thread-safe under the following condition: No two threads attempt to call either
/// <see cref="ReceiveAsync"/> or <see cref="SendAsync"/> simultaneously. Different threads may call each method, but the same method
/// cannot be re-entered while it is being run in a different thread. However, ensure you verify that the specific implementor is
/// thread-safe in this way. For example, <see cref="WebSocketConnection"/> (including the implementations returned by the
/// static factory methods on that type) is thread-safe in this way.
/// </para>
/// <para>
/// The general pattern of having a single thread running <see cref="ReceiveAsync"/> and a separate thread running <see cref="SendAsync"/> will
/// be thread-safe, as each method interacts with completely separate state.
/// </para>
/// </remarks>
public interface IWebSocketConnection : IDisposable
{
WebSocketConnectionState State { get; }
/// <summary>
/// Sends the specified frame.
/// </summary>
/// <param name="message">The message to send.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that indicates when/if the send is cancelled.</param>
/// <returns>A <see cref="Task"/> that completes when the message has been written to the outbound stream.</returns>
Task SendAsync(WebSocketFrame message, CancellationToken cancellationToken);
/// <summary>
/// Sends a Close frame to the other party. This does not guarantee that the client will send a responding close frame.
/// </summary>
/// <remarks>
/// If the other party does not respond with a close frame, the connection will remain open and the <see cref="Task{WebSocketCloseResult}"/>
/// will remain active. Call the <see cref="IDisposable.Dispose"/> method on this instance to forcibly terminate the connection.
/// </remarks>
/// <param name="result">A <see cref="WebSocketCloseResult"/> with the payload for the close frame</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that indicates when/if the send is cancelled.</param>
/// <returns>A <see cref="Task"/> that completes when the close frame has been sent</returns>
Task CloseAsync(WebSocketCloseResult result, CancellationToken cancellationToken);
/// <summary>
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <param name="state">A state parameter that will be passed to each invocation of <paramref name="messageHandler"/></param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
Task<WebSocketCloseResult> ExecuteAsync(Func<WebSocketFrame, object, Task> messageHandler, object state);
}
public static class WebSocketConnectionExtensions
{
/// <summary>
/// Sends the specified frame.
/// </summary>
/// <param name="message">The message to send.</param>
/// <returns>A <see cref="Task"/> that completes when the message has been written to the outbound stream.</returns>
public static Task SendAsync(this IWebSocketConnection self, WebSocketFrame message) => self.SendAsync(message, CancellationToken.None);
/// <summary>
/// Sends a Close frame to the other party. This does not guarantee that the client will send a responding close frame.
/// </summary>
/// <param name="result">A <see cref="WebSocketCloseResult"/> with the payload for the close frame</param>
/// <returns>A <see cref="Task"/> that completes when the close frame has been sent</returns>
public static Task CloseAsync(this IWebSocketConnection self, WebSocketCloseResult result) => self.CloseAsync(result, CancellationToken.None);
/// <summary>
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
public static Task<WebSocketCloseResult> ExecuteAsync(this IWebSocketConnection self, Action<WebSocketFrame> messageHandler) =>
self.ExecuteAsync((frame, _) =>
{
messageHandler(frame);
return Task.CompletedTask;
}, null);
/// <summary>
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
public static Task<WebSocketCloseResult> ExecuteAsync(this IWebSocketConnection self, Action<WebSocketFrame, object> messageHandler, object state) =>
self.ExecuteAsync((frame, s) =>
{
messageHandler(frame, s);
return Task.CompletedTask;
}, state);
/// <summary>
/// Runs the WebSocket receive loop, using the provided message handler.
/// </summary>
/// <param name="messageHandler">The callback that will be invoked for each new frame</param>
/// <returns>A <see cref="Task{WebSocketCloseResult}"/> that will complete when the client has sent a close frame, or the connection has been terminated</returns>
public static Task<WebSocketCloseResult> ExecuteAsync(this IWebSocketConnection self, Func<WebSocketFrame, Task> messageHandler) =>
self.ExecuteAsync((frame, _) => messageHandler(frame), null);
}
}

View File

@ -0,0 +1,52 @@
using System;
using System.Binary;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal
{
internal static class MaskingUtilities
{
// Plenty of optimization to be done here but not our immediate priority right now.
// Including: Vectorization, striding by uints (even when not vectorized; we'd probably flip the
// overload that does the implementation in that case and do it in the uint version).
public static void ApplyMask(ref ReadableBuffer payload, uint maskingKey)
{
unsafe
{
// Write the masking key as bytes to simplify access. Use a stackalloc buffer because it's fixed-size
var maskingKeyBytes = stackalloc byte[4];
var maskingKeySpan = new Span<byte>(maskingKeyBytes, 4);
maskingKeySpan.WriteBigEndian(maskingKey);
ApplyMask(ref payload, maskingKeySpan);
}
}
public static void ApplyMask(ref ReadableBuffer payload, Span<byte> maskingKey)
{
var offset = 0;
foreach (var mem in payload)
{
var span = mem.Span;
ApplyMask(span, maskingKey, ref offset);
offset += span.Length;
}
}
public static void ApplyMask(Span<byte> payload, Span<byte> maskingKey)
{
var i = 0;
ApplyMask(payload, maskingKey, ref i);
}
private static void ApplyMask(Span<byte> payload, Span<byte> maskingKey, ref int maskingKeyOffset)
{
for (int i = 0; i < payload.Length; i++)
{
payload[i] = (byte)(payload[i] ^ maskingKey[maskingKeyOffset % 4]);
maskingKeyOffset++;
}
}
}
}

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>5d9da986-2eab-4c6d-bf15-9a4bdd4de775</ProjectGuid>
<RootNamespace>Microsoft.Extensions.WebSockets</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>

View File

@ -0,0 +1,12 @@
using System.Reflection;
using System.Runtime.CompilerServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Microsoft.Extensions.WebSockets")]
[assembly: AssemblyTrademark("")]
[assembly: InternalsVisibleTo("Microsoft.Extensions.WebSockets.Tests")]

View File

@ -0,0 +1,71 @@
using System.Binary;
using System.Text;
using Channels;
using Channels.Text.Primitives;
namespace Microsoft.Extensions.WebSockets.Internal
{
/// <summary>
/// Represents the payload of a Close frame (i.e. a <see cref="WebSocketFrame"/> with an <see cref="WebSocketFrame.Opcode"/> of <see cref="WebSocketOpcode.Close"/>).
/// </summary>
public struct WebSocketCloseResult
{
internal static WebSocketCloseResult AbnormalClosure = new WebSocketCloseResult(WebSocketCloseStatus.AbnormalClosure, "Underlying transport connection was terminated");
internal static WebSocketCloseResult Empty = new WebSocketCloseResult(WebSocketCloseStatus.Empty);
/// <summary>
/// Gets the close status code specified in the frame.
/// </summary>
public WebSocketCloseStatus Status { get; }
/// <summary>
/// Gets the close status description specified in the frame.
/// </summary>
public string Description { get; }
public WebSocketCloseResult(WebSocketCloseStatus status) : this(status, string.Empty) { }
public WebSocketCloseResult(WebSocketCloseStatus status, string description)
{
Status = status;
Description = description;
}
public int GetSize() => Encoding.UTF8.GetByteCount(Description) + sizeof(ushort);
public static bool TryParse(ReadableBuffer payload, out WebSocketCloseResult result)
{
if (payload.Length == 0)
{
// Empty payload is OK
result = new WebSocketCloseResult(WebSocketCloseStatus.Empty, string.Empty);
return true;
}
else if (payload.Length < 2)
{
result = default(WebSocketCloseResult);
return false;
}
else
{
var status = payload.ReadBigEndian<ushort>();
var description = string.Empty;
payload = payload.Slice(2);
if (payload.Length > 0)
{
description = payload.GetUtf8String();
}
result = new WebSocketCloseResult((WebSocketCloseStatus)status, description);
return true;
}
}
public void WriteTo(ref WritableBuffer buffer)
{
buffer.WriteBigEndian((ushort)Status);
if (!string.IsNullOrEmpty(Description))
{
buffer.WriteUtf8String(Description);
}
}
}
}

View File

@ -0,0 +1,74 @@
namespace Microsoft.Extensions.WebSockets.Internal
{
/// <summary>
/// Represents well-known WebSocket Close frame status codes.
/// </summary>
/// <remarks>
/// See https://tools.ietf.org/html/rfc6455#section-7.4 for details
/// </remarks>
public enum WebSocketCloseStatus : ushort
{
/// <summary>
/// Indicates that the purpose for the connection was fulfilled and thus the connection was closed normally.
/// </summary>
NormalClosure = 1000,
/// <summary>
/// Indicates that the other endpoint is going away, such as a server shutting down or a browser navigating to a new page.
/// </summary>
EndpointUnavailable = 1001,
/// <summary>
/// Indicates that a protocol error has occurred, causing the connection to be terminated.
/// </summary>
ProtocolError = 1002,
/// <summary>
/// Indicates an invalid message type was received. For example, if the end point only supports <see cref="WebSocketOpcode.Text"/> messages
/// but received a <see cref="WebSocketOpcode.Binary"/> message.
/// </summary>
InvalidMessageType = 1003,
/// <summary>
/// Indicates that the Close frame did not have a status code. Not used in actual transmission.
/// </summary>
Empty = 1005,
/// <summary>
/// Indicates that the underlying transport connection was terminated without a proper close handshake. Not used in actual transmission.
/// </summary>
AbnormalClosure = 1006,
/// <summary>
/// Indicates that an invalid payload was encountered. For example, a frame of type <see cref="WebSocketOpcode.Text"/> contained non-UTF-8 data.
/// </summary>
InvalidPayloadData = 1007,
/// <summary>
/// Indicates that the connection is being terminated due to a violation of policy. This is a generic error code used whenever a party needs to terminate
/// a connection without disclosing the specific reason.
/// </summary>
PolicyViolation = 1008,
/// <summary>
/// Indicates that the connection is being terminated due to an endpoint receiving a message that is too large.
/// </summary>
MessageTooBig = 1009,
/// <summary>
/// Indicates that the connection is being terminated due to being unable to negotiate a mandatory extension with the other party. Usually sent
/// from the client to the server after the client finishes handshaking without negotiating the extension.
/// </summary>
MandatoryExtension = 1010,
/// <summary>
/// Indicates that a server is terminating the connection due to an internal error.
/// </summary>
InternalServerError = 1011,
/// <summary>
/// Indicates that the connection failed to establish because the TLS handshake failed. Not used in actual transmission.
/// </summary>
TLSHandshakeFailed = 1015
}
}

View File

@ -0,0 +1,456 @@
using System;
using System.Binary;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal
{
/// <summary>
/// Provides the default implementation of <see cref="IWebSocketConnection"/>.
/// </summary>
/// <remarks>
/// <para>
/// This type is thread-safe under the following condition: No two threads attempt to call either
/// <see cref="ReceiveAsync"/> or <see cref="SendAsync"/> simultaneously. Different threads may call each method, but the same method
/// cannot be re-entered while it is being run in a different thread.
/// </para>
/// <para>
/// The general pattern of having a single thread running <see cref="ReceiveAsync"/> and a separate thread running <see cref="SendAsync"/> will
/// be thread-safe, as each method interacts with completely separate state.
/// </para>
/// </remarks>
public class WebSocketConnection : IWebSocketConnection
{
private readonly RandomNumberGenerator _random;
private readonly byte[] _maskingKey;
private readonly IReadableChannel _inbound;
private readonly IWritableChannel _outbound;
private readonly CancellationTokenSource _terminateReceiveCts = new CancellationTokenSource();
public WebSocketConnectionState State { get; private set; } = WebSocketConnectionState.Created;
/// <summary>
/// Constructs a new, unmasked, <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound) : this(inbound, outbound, masked: false) { }
/// <summary>
/// Constructs a new, optionally masked, <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
/// <param name="masked">A boolean indicating if frames sent from this socket should be masked (the masking key is automatically generated)</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound, bool masked)
{
_inbound = inbound;
_outbound = outbound;
if (masked)
{
_maskingKey = new byte[4];
_random = RandomNumberGenerator.Create();
}
}
/// <summary>
/// Constructs a new, fixed masking-key, <see cref="WebSocketConnection"/> from an <see cref="IReadableChannel"/> and an <see cref="IWritableChannel"/> that represents an established WebSocket connection (i.e. after handshaking)
/// </summary>
/// <param name="inbound">A <see cref="IReadableChannel"/> from which frames will be read when receiving.</param>
/// <param name="outbound">A <see cref="IWritableChannel"/> to which frame will be written when sending.</param>
/// <param name="fixedMaskingKey">The masking key to use for the connection. Must be exactly 4-bytes long. This is ONLY recommended for testing and development purposes.</param>
public WebSocketConnection(IReadableChannel inbound, IWritableChannel outbound, byte[] fixedMaskingKey)
{
_inbound = inbound;
_outbound = outbound;
_maskingKey = fixedMaskingKey;
}
public void Dispose()
{
State = WebSocketConnectionState.Closed;
_inbound.Complete();
_outbound.Complete();
_terminateReceiveCts.Cancel();
}
public Task<WebSocketCloseResult> ExecuteAsync(Func<WebSocketFrame, object, Task> messageHandler, object state)
{
if (State == WebSocketConnectionState.Closed)
{
throw new ObjectDisposedException(nameof(WebSocketConnection));
}
if (State != WebSocketConnectionState.Created)
{
throw new InvalidOperationException("Connection is already running.");
}
State = WebSocketConnectionState.Connected;
return ReceiveLoop(messageHandler, state, _terminateReceiveCts.Token);
}
/// <summary>
/// Sends the specified frame.
/// </summary>
/// <param name="frame">The frame to send.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that indicates when/if the send is cancelled.</param>
/// <returns>A <see cref="Task"/> that completes when the message has been written to the outbound stream.</returns>
// TODO: De-taskify this to allow consumers to create their own awaiter.
public Task SendAsync(WebSocketFrame frame, CancellationToken cancellationToken)
{
if (State == WebSocketConnectionState.Closed)
{
throw new ObjectDisposedException(nameof(WebSocketConnection));
}
// This clause is a bit of an artificial restriction to ensure people run "Execute". Maybe we don't care?
else if (State == WebSocketConnectionState.Created)
{
throw new InvalidOperationException("Cannot send until the connection is started using Execute");
}
else if (State == WebSocketConnectionState.CloseSent)
{
throw new InvalidOperationException("Cannot send after sending a Close frame");
}
if (frame.Opcode == WebSocketOpcode.Close)
{
throw new InvalidOperationException("Cannot use SendAsync to send a Close frame, use CloseAsync instead.");
}
return SendCoreAsync(frame, null, cancellationToken);
}
/// <summary>
/// Sends a Close frame to the other party. This does not guarantee that the client will send a responding close frame.
/// </summary>
/// <remarks>
/// If the other party does not respond with a close frame, the connection will remain open and the <see cref="Task{WebSocketCloseResult}"/>
/// will remain active. Call the <see cref="IDisposable.Dispose"/> method on this instance to forcibly terminate the connection.
/// </remarks>
/// <param name="result">A <see cref="WebSocketCloseResult"/> with the payload for the close frame</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that indicates when/if the send is cancelled.</param>
/// <returns>A <see cref="Task"/> that completes when the close frame has been sent</returns>
public async Task CloseAsync(WebSocketCloseResult result, CancellationToken cancellationToken)
{
if (State == WebSocketConnectionState.Closed)
{
throw new ObjectDisposedException(nameof(WebSocketConnection));
}
else if (State == WebSocketConnectionState.Created)
{
throw new InvalidOperationException("Cannot send close frame when the connection hasn't been started");
}
else if (State == WebSocketConnectionState.CloseSent)
{
throw new InvalidOperationException("Cannot send multiple close frames");
}
// When we pass a close result to SendCoreAsync, the frame is only used for the header and the payload is ignored
var frame = new WebSocketFrame(endOfMessage: true, opcode: WebSocketOpcode.Close, payload: default(ReadableBuffer));
await SendCoreAsync(frame, result, cancellationToken);
if (State == WebSocketConnectionState.CloseReceived)
{
State = WebSocketConnectionState.Closed;
}
else
{
State = WebSocketConnectionState.CloseSent;
}
}
private void WriteMaskingKey(Span<byte> buffer)
{
if (_random != null)
{
// Get a new random mask
// Until https://github.com/dotnet/corefx/issues/12323 is fixed we need to use this shared buffer and copy model
// Once we have that fix we should be able to generate the mask directly into the output buffer.
_random.GetBytes(_maskingKey);
}
buffer.Set(_maskingKey);
}
private async Task<WebSocketCloseResult> ReceiveLoop(Func<WebSocketFrame, object, Task> messageHandler, object state, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
// WebSocket Frame layout (https://tools.ietf.org/html/rfc6455#section-5.2):
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// +-------------------------------- - - - - - - - - - - - - - - - +
// : Payload Data continued ... :
// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
// | Payload Data continued ... |
// +---------------------------------------------------------------+
// Read at least 2 bytes
var result = await _inbound.ReadAtLeastAsync(2, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
if (result.IsCompleted && result.Buffer.Length < 2)
{
return WebSocketCloseResult.AbnormalClosure;
}
var buffer = result.Buffer;
// Read the opcode
var opcodeByte = buffer.ReadBigEndian<byte>();
buffer = buffer.Slice(1);
var fin = (opcodeByte & 0x01) != 0;
var opcode = (WebSocketOpcode)((opcodeByte & 0xF0) >> 4);
// Read the first byte of the payload length
var lenByte = buffer.ReadBigEndian<byte>();
buffer = buffer.Slice(1);
var masked = (lenByte & 0x01) != 0;
var payloadLen = (lenByte & 0xFE) >> 1;
// Mark what we've got so far as consumed
_inbound.Advance(buffer.Start);
// Calculate the rest of the header length
var headerLength = masked ? 4 : 0;
if (payloadLen == 126)
{
headerLength += 2;
}
else if (payloadLen == 127)
{
headerLength += 4;
}
uint maskingKey = 0;
if (headerLength > 0)
{
result = await _inbound.ReadAtLeastAsync(headerLength, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
if (result.IsCompleted && result.Buffer.Length < headerLength)
{
return WebSocketCloseResult.AbnormalClosure;
}
buffer = result.Buffer;
// Read extended payload length (if any)
if (payloadLen == 126)
{
payloadLen = buffer.ReadBigEndian<ushort>();
buffer = buffer.Slice(sizeof(ushort));
}
else if (payloadLen == 127)
{
var longLen = buffer.ReadBigEndian<ulong>();
buffer = buffer.Slice(sizeof(ulong));
if (longLen > int.MaxValue)
{
throw new WebSocketException($"Frame is too large. Maximum frame size is {int.MaxValue} bytes");
}
payloadLen = (int)longLen;
}
// Read masking key
if (masked)
{
var maskingKeyStart = buffer.Start;
maskingKey = buffer.Slice(0, 4).ReadBigEndian<uint>();
buffer = buffer.Slice(4);
}
// Mark the length and masking key consumed
_inbound.Advance(buffer.Start);
}
var payload = default(ReadableBuffer);
if (payloadLen > 0)
{
result = await _inbound.ReadAtLeastAsync(payloadLen, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
if (result.IsCompleted && result.Buffer.Length < payloadLen)
{
return WebSocketCloseResult.AbnormalClosure;
}
buffer = result.Buffer;
payload = buffer.Slice(0, payloadLen);
if (masked)
{
// Unmask
MaskingUtilities.ApplyMask(ref payload, maskingKey);
}
}
// Run the callback, if we're not cancelled.
cancellationToken.ThrowIfCancellationRequested();
var frame = new WebSocketFrame(fin, opcode, payload);
if (frame.Opcode == WebSocketOpcode.Close)
{
return HandleCloseFrame(payloadLen, payload, frame);
}
else
{
await messageHandler(frame, state);
}
// Mark the payload as consumed
if (payloadLen > 0)
{
_inbound.Advance(payload.End);
}
}
return WebSocketCloseResult.AbnormalClosure;
}
private WebSocketCloseResult HandleCloseFrame(int payloadLen, ReadableBuffer payload, WebSocketFrame frame)
{
// Update state
if (State == WebSocketConnectionState.CloseSent)
{
State = WebSocketConnectionState.Closed;
}
else
{
State = WebSocketConnectionState.CloseReceived;
}
// Process the close frame
WebSocketCloseResult closeResult;
if (!WebSocketCloseResult.TryParse(frame.Payload, out closeResult))
{
closeResult = WebSocketCloseResult.Empty;
}
// Make the payload as consumed
if (payloadLen > 0)
{
_inbound.Advance(payload.End);
}
return closeResult;
}
private Task SendCoreAsync(WebSocketFrame message, WebSocketCloseResult? closeResult, CancellationToken cancellationToken)
{
// Base header size is 2 bytes.
var allocSize = 2;
var payloadLength = closeResult == null ? message.Payload.Length : closeResult.Value.GetSize();
if (payloadLength > ushort.MaxValue)
{
// We're going to need an 8-byte length
allocSize += 8;
}
else if (payloadLength > 125)
{
// We're going to need a 2-byte length
allocSize += 2;
}
if (_maskingKey != null)
{
// We need space for the masking key
allocSize += 4;
}
if (closeResult != null)
{
// We need space for the close result payload too
allocSize += payloadLength;
}
// Allocate a buffer
var buffer = _outbound.Alloc(minimumSize: allocSize);
Debug.Assert(buffer.Memory.Length >= allocSize);
if (buffer.Memory.Length < allocSize)
{
throw new InvalidOperationException("Couldn't allocate enough data from the channel to write the header");
}
// Write the opcode and FIN flag
var opcodeByte = (byte)((int)message.Opcode << 4);
if (message.EndOfMessage)
{
opcodeByte |= 1;
}
buffer.WriteBigEndian(opcodeByte);
// Write the length and mask flag
var maskingByte = _maskingKey != null ? 0x01 : 0x00; // TODO: Masking flag goes here
if (payloadLength > ushort.MaxValue)
{
buffer.WriteBigEndian((byte)(0xFE | maskingByte));
// 8-byte length
buffer.WriteBigEndian((ulong)payloadLength);
}
else if (payloadLength > 125)
{
buffer.WriteBigEndian((byte)(0xFC | maskingByte));
// 2-byte length
buffer.WriteBigEndian((ushort)payloadLength);
}
else
{
// 1-byte length
buffer.WriteBigEndian((byte)((payloadLength << 1) | maskingByte));
}
var maskingKey = Span<byte>.Empty;
if (_maskingKey != null)
{
// Get a span of the output buffer for the masking key, write it there, then advance the write head.
maskingKey = buffer.Memory.Slice(0, 4).Span;
WriteMaskingKey(maskingKey);
buffer.Advance(4);
}
if (closeResult != null)
{
// Write the close payload out
var payload = buffer.Memory.Slice(0, payloadLength).Span;
closeResult.Value.WriteTo(ref buffer);
if (_maskingKey != null)
{
MaskingUtilities.ApplyMask(payload, maskingKey);
}
}
else
{
// This will copy the actual buffer struct, but NOT the underlying data
// We need a field so we can by-ref it.
var payload = message.Payload;
if (_maskingKey != null)
{
// Mask the payload in it's own buffer
MaskingUtilities.ApplyMask(ref payload, maskingKey);
}
// Append the (masked) buffer to the output channel
buffer.Append(payload);
}
// Commit and Flush
return buffer.FlushAsync();
}
}
}

View File

@ -0,0 +1,11 @@
namespace Microsoft.Extensions.WebSockets.Internal
{
public enum WebSocketConnectionState
{
Created,
Connected,
CloseSent,
CloseReceived,
Closed
}
}

View File

@ -0,0 +1,19 @@
using System;
namespace Microsoft.Extensions.WebSockets.Internal
{
public class WebSocketException : Exception
{
public WebSocketException()
{
}
public WebSocketException(string message) : base(message)
{
}
public WebSocketException(string message, Exception innerException) : base(message, innerException)
{
}
}
}

View File

@ -0,0 +1,32 @@
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal
{
/// <summary>
/// Represents a single Frame received or sent on a <see cref="IWebSocketConnection"/>.
/// </summary>
public struct WebSocketFrame
{
/// <summary>
/// Indicates if the "FIN" flag is set on this frame, which indicates it is the final frame of a message.
/// </summary>
public bool EndOfMessage { get; }
/// <summary>
/// Gets the <see cref="WebSocketOpcode"/> value describing the opcode of the WebSocket frame.
/// </summary>
public WebSocketOpcode Opcode { get; }
/// <summary>
/// Gets the payload of the WebSocket frame.
/// </summary>
public ReadableBuffer Payload { get; }
public WebSocketFrame(bool endOfMessage, WebSocketOpcode opcode, ReadableBuffer payload)
{
EndOfMessage = endOfMessage;
Opcode = opcode;
Payload = payload;
}
}
}

View File

@ -0,0 +1,42 @@
namespace Microsoft.Extensions.WebSockets.Internal
{
/// <summary>
/// Represents the possible values for the "opcode" field of a WebSocket frame.
/// </summary>
public enum WebSocketOpcode
{
/// <summary>
/// Indicates that the frame is a continuation of the previous <see cref="Text"/> or <see cref="Binary"/> frame.
/// </summary>
Continuation = 0x0,
/// <summary>
/// Indicates that the frame is the first frame of a new Text message, formatted in UTF-8.
/// </summary>
Text = 0x1,
/// <summary>
/// Indicates that the frame is the first frame of a new Binary message.
/// </summary>
Binary = 0x2,
/* 0x3 - 0x7 are reserved */
/// <summary>
/// Indicates that the frame is a notification that the sender is closing their end of the connection
/// </summary>
Close = 0x8,
/// <summary>
/// Indicates a request from the sender to receive a <see cref="Pong"/>, in order to maintain the connection.
/// </summary>
Ping = 0x9,
/// <summary>
/// Indicates a response to a <see cref="Ping"/>, in order to maintain the connection.
/// </summary>
Pong = 0xA,
/* 0xB-0xF are reserved */
/* all opcodes above 0xF are invalid */
}
}

View File

@ -0,0 +1,34 @@
{
"version": "0.1.0-*",
"buildOptions": {
"warningsAsErrors": true,
"allowUnsafe": true
},
"description": "Low-allocation Push-oriented WebSockets based on Channels",
"packOptions": {
"repository": {
"type": "git",
"url": "git://github.com/aspnet/websockets"
}
},
"dependencies": {
"Channels": "0.2.0-beta-*",
"Channels.Text.Primitives": "0.2.0-beta-*"
},
"frameworks": {
"net46": {},
"netstandard1.3": {
"dependencies": {
"System.Collections": "4.0.11",
"System.Diagnostics.Debug": "4.0.11",
"System.IO": "4.1.0",
"System.Linq": "4.1.0",
"System.Runtime": "4.1.0",
"System.Runtime.Extensions": "4.1.0",
"System.Threading": "4.0.11",
"System.Threading.Tasks": "4.0.11"
}
}
}
}

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0.25420" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0.25420</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>a7050bae-3db9-4fb3-a49d-303201415b13</ProjectGuid>
<RootNamespace>Microsoft.Extensions.WebSockets.Internal.Tests</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>

View File

@ -0,0 +1,19 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Microsoft.Extensions.WebSockets.Test")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("216f6739-da4d-4371-8393-739a90826c29")]

View File

@ -0,0 +1,24 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
public static class WebSocketConnectionExtensions
{
public static async Task<WebSocketConnectionSummary> ExecuteAndCaptureFramesAsync(this IWebSocketConnection self)
{
var frames = new List<WebSocketFrame>();
var closeResult = await self.ExecuteAsync(frame =>
{
var buffer = new byte[frame.Payload.Length];
frame.Payload.CopyTo(buffer);
frames.Add(new WebSocketFrame(
frame.EndOfMessage,
frame.Opcode,
ReadableBuffer.Create(buffer, 0, buffer.Length)));
});
return new WebSocketConnectionSummary(frames, closeResult);
}
}
}

View File

@ -0,0 +1,16 @@
using System.Collections.Generic;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
public class WebSocketConnectionSummary
{
public IList<WebSocketFrame> Received { get; }
public WebSocketCloseResult CloseResult { get; }
public WebSocketConnectionSummary(IList<WebSocketFrame> received, WebSocketCloseResult closeResult)
{
Received = received;
CloseResult = closeResult;
}
}
}

View File

@ -0,0 +1,131 @@
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
public partial class WebSocketConnectionTests
{
[Fact]
public async Task SendReceiveFrames()
{
using (var pair = WebSocketPair.Create())
{
var cts = new CancellationTokenSource();
if (!Debugger.IsAttached)
{
cts.CancelAfter(TimeSpan.FromSeconds(5));
}
using (cts.Token.Register(() => pair.Dispose()))
{
var client = pair.ClientSocket.ExecuteAsync(_ =>
{
Assert.False(true, "did not expect the client to receive any frames!");
return Task.CompletedTask;
});
// Send Frames
await pair.ClientSocket.SendAsync(CreateTextFrame("Hello"));
await pair.ClientSocket.SendAsync(CreateTextFrame("World"));
await pair.ClientSocket.SendAsync(CreateBinaryFrame(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }));
await pair.ClientSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.NormalClosure));
var summary = await pair.ServerSocket.ExecuteAndCaptureFramesAsync();
Assert.Equal(3, summary.Received.Count);
Assert.Equal("Hello", Encoding.UTF8.GetString(summary.Received[0].Payload.ToArray()));
Assert.Equal("World", Encoding.UTF8.GetString(summary.Received[1].Payload.ToArray()));
Assert.Equal(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }, summary.Received[2].Payload.ToArray());
await pair.ServerSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.NormalClosure));
await client;
}
}
}
[Fact]
public async Task ExecuteReturnsWhenCloseFrameReceived()
{
using (var pair = WebSocketPair.Create())
{
var client = pair.ClientSocket.ExecuteAndCaptureFramesAsync();
await pair.ClientSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.InvalidMessageType, "Abc"));
var serverSummary = await pair.ServerSocket.ExecuteAndCaptureFramesAsync();
await pair.ServerSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.NormalClosure, "Ok"));
var clientSummary = await client;
Assert.Equal(0, serverSummary.Received.Count);
Assert.Equal(WebSocketCloseStatus.InvalidMessageType, serverSummary.CloseResult.Status);
Assert.Equal("Abc", serverSummary.CloseResult.Description);
Assert.Equal(0, clientSummary.Received.Count);
Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.Status);
Assert.Equal("Ok", clientSummary.CloseResult.Description);
}
}
[Fact]
public async Task AbnormalTerminationOfInboundChannelCausesExecuteToThrow()
{
using (var pair = WebSocketPair.Create())
{
var client = pair.ClientSocket.ExecuteAndCaptureFramesAsync();
var server = pair.ServerSocket.ExecuteAndCaptureFramesAsync();
pair.TerminateFromClient(new InvalidOperationException("It broke!"));
await Assert.ThrowsAsync<InvalidOperationException>(() => server);
}
}
[Fact]
public async Task StateTransitions()
{
using (var pair = WebSocketPair.Create())
{
// Initial State
Assert.Equal(WebSocketConnectionState.Created, pair.ServerSocket.State);
Assert.Equal(WebSocketConnectionState.Created, pair.ClientSocket.State);
// Start the sockets
var serverReceiving = new TaskCompletionSource<object>();
var clientReceiving = new TaskCompletionSource<object>();
var server = pair.ServerSocket.ExecuteAsync(frame => serverReceiving.TrySetResult(null));
var client = pair.ClientSocket.ExecuteAsync(frame => clientReceiving.TrySetResult(null));
// Send a frame from each and verify that the state transitioned.
// We need to do this because it's the only way to correctly wait for the state transition (which happens asynchronously in ExecuteAsync)
await pair.ClientSocket.SendAsync(CreateTextFrame("Hello"));
await pair.ServerSocket.SendAsync(CreateTextFrame("Hello"));
await Task.WhenAll(serverReceiving.Task, clientReceiving.Task);
// Check state
Assert.Equal(WebSocketConnectionState.Connected, pair.ServerSocket.State);
Assert.Equal(WebSocketConnectionState.Connected, pair.ClientSocket.State);
// Close the server socket
await pair.ServerSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.NormalClosure));
await client;
// Check state
Assert.Equal(WebSocketConnectionState.CloseSent, pair.ServerSocket.State);
Assert.Equal(WebSocketConnectionState.CloseReceived, pair.ClientSocket.State);
// Close the client socket
await pair.ClientSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.NormalClosure));
await server;
// Check state
Assert.Equal(WebSocketConnectionState.Closed, pair.ServerSocket.State);
Assert.Equal(WebSocketConnectionState.Closed, pair.ClientSocket.State);
// Verify we can't restart the connection or send a message
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await pair.ServerSocket.ExecuteAsync(f => { }));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await pair.ClientSocket.SendAsync(CreateTextFrame("Nope")));
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await pair.ClientSocket.CloseAsync(new WebSocketCloseResult(WebSocketCloseStatus.NormalClosure)));
}
}
}
}

View File

@ -0,0 +1,212 @@
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
public partial class WebSocketConnectionTests
{
public class TheReceiveAsyncMethod
{
[Theory]
[InlineData(new byte[] { 0x11, 0x00 }, "", true)]
[InlineData(new byte[] { 0x11, 0x0A, 0x48, 0x65, 0x6C, 0x6C, 0x6F }, "Hello", true)]
[InlineData(new byte[] { 0x11, 0x0B, 0x1, 0x2, 0x3, 0x4, 0x48 ^ 0x1, 0x65 ^ 0x2, 0x6C ^ 0x3, 0x6C ^ 0x4, 0x6F ^ 0x1 }, "Hello", true)]
[InlineData(new byte[] { 0x10, 0x00 }, "", false)]
[InlineData(new byte[] { 0x10, 0x0A, 0x48, 0x65, 0x6C, 0x6C, 0x6F }, "Hello", false)]
[InlineData(new byte[] { 0x10, 0x0B, 0x1, 0x2, 0x3, 0x4, 0x48 ^ 0x1, 0x65 ^ 0x2, 0x6C ^ 0x3, 0x6C ^ 0x4, 0x6F ^ 0x1 }, "Hello", false)]
public Task ReadTextFrames(byte[] rawFrame, string message, bool endOfMessage)
{
return RunSingleFrameTest(
rawFrame,
endOfMessage,
WebSocketOpcode.Text,
b => Assert.Equal(message, Encoding.UTF8.GetString(b)));
}
[Theory]
// Opcode = Binary
[InlineData(new byte[] { 0x21, 0x00 }, new byte[0], WebSocketOpcode.Binary, true)]
[InlineData(new byte[] { 0x21, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Binary, true)]
[InlineData(new byte[] { 0x21, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Binary, true)]
[InlineData(new byte[] { 0x20, 0x00 }, new byte[0], WebSocketOpcode.Binary, false)]
[InlineData(new byte[] { 0x20, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Binary, false)]
[InlineData(new byte[] { 0x20, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Binary, false)]
// Opcode = Continuation
[InlineData(new byte[] { 0x01, 0x00 }, new byte[0], WebSocketOpcode.Continuation, true)]
[InlineData(new byte[] { 0x01, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Continuation, true)]
[InlineData(new byte[] { 0x01, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Continuation, true)]
[InlineData(new byte[] { 0x00, 0x00 }, new byte[0], WebSocketOpcode.Continuation, false)]
[InlineData(new byte[] { 0x00, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Continuation, false)]
[InlineData(new byte[] { 0x00, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Continuation, false)]
// Opcode = Ping
[InlineData(new byte[] { 0x91, 0x00 }, new byte[0], WebSocketOpcode.Ping, true)]
[InlineData(new byte[] { 0x91, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Ping, true)]
[InlineData(new byte[] { 0x91, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Ping, true)]
[InlineData(new byte[] { 0x90, 0x00 }, new byte[0], WebSocketOpcode.Ping, false)]
[InlineData(new byte[] { 0x90, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Ping, false)]
[InlineData(new byte[] { 0x90, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Ping, false)]
// Opcode = Pong
[InlineData(new byte[] { 0xA1, 0x00 }, new byte[0], WebSocketOpcode.Pong, true)]
[InlineData(new byte[] { 0xA1, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Pong, true)]
[InlineData(new byte[] { 0xA1, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Pong, true)]
[InlineData(new byte[] { 0xA0, 0x00 }, new byte[0], WebSocketOpcode.Pong, false)]
[InlineData(new byte[] { 0xA0, 0x0A, 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Pong, false)]
[InlineData(new byte[] { 0xA0, 0x0B, 0x1, 0x2, 0x3, 0x4, 0xDE ^ 0x1, 0xAD ^ 0x2, 0xBE ^ 0x3, 0xEF ^ 0x4, 0xAB ^ 0x1 }, new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, WebSocketOpcode.Pong, false)]
public Task ReadBinaryFormattedFrames(byte[] rawFrame, byte[] payload, WebSocketOpcode opcode, bool endOfMessage)
{
return RunSingleFrameTest(
rawFrame,
endOfMessage,
opcode,
b => Assert.Equal(payload, b));
}
[Fact]
public async Task ReadMultipleFramesAcrossMultipleBuffers()
{
var result = await RunReceiveTest(
producer: async (channel, cancellationToken) =>
{
await channel.WriteAsync(new byte[] { 0x20, 0x0A }.Slice());
await channel.WriteAsync(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB, 0x01, 0x0A }.Slice());
await channel.WriteAsync(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }.Slice());
await channel.WriteAsync(new byte[] { 0xAB }.Slice());
});
Assert.Equal(2, result.Received.Count);
Assert.False(result.Received[0].EndOfMessage);
Assert.Equal(WebSocketOpcode.Binary, result.Received[0].Opcode);
Assert.Equal(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, result.Received[0].Payload.ToArray());
Assert.True(result.Received[1].EndOfMessage);
Assert.Equal(WebSocketOpcode.Continuation, result.Received[1].Opcode);
Assert.Equal(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF, 0xAB }, result.Received[1].Payload.ToArray());
}
[Fact]
public async Task Read16BitPayloadLength()
{
var expectedPayload = new byte[1024];
new Random().NextBytes(expectedPayload);
var result = await RunReceiveTest(
producer: async (channel, cancellationToken) =>
{
// Header: (Opcode=Binary, Fin=true), (Mask=false, Len=126), (16-bit big endian length)
await channel.WriteAsync(new byte[] { 0x21, 0xFC, 0x04, 0x00 });
await channel.WriteAsync(expectedPayload);
});
Assert.Equal(1, result.Received.Count);
var frame = result.Received[0];
Assert.True(frame.EndOfMessage);
Assert.Equal(WebSocketOpcode.Binary, frame.Opcode);
Assert.Equal(expectedPayload, frame.Payload.ToArray());
}
[Fact]
public async Task Read64bitPayloadLength()
{
// Allocating an actual (2^32 + 1) byte payload is crazy for this test. We just need to test that we can USE a 64-bit length
var expectedPayload = new byte[1024];
new Random().NextBytes(expectedPayload);
var result = await RunReceiveTest(
producer: async (channel, cancellationToken) =>
{
// Header: (Opcode=Binary, Fin=true), (Mask=false, Len=127), (64-bit big endian length)
await channel.WriteAsync(new byte[] { 0x21, 0xFE, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00 });
await channel.WriteAsync(expectedPayload);
});
Assert.Equal(1, result.Received.Count);
var frame = result.Received[0];
Assert.True(frame.EndOfMessage);
Assert.Equal(WebSocketOpcode.Binary, frame.Opcode);
Assert.Equal(expectedPayload, frame.Payload.ToArray());
}
private static async Task RunSingleFrameTest(byte[] rawFrame, bool endOfMessage, WebSocketOpcode expectedOpcode, Action<byte[]> payloadAssert)
{
var result = await RunReceiveTest(
producer: async (channel, cancellationToken) =>
{
await channel.WriteAsync(rawFrame.Slice());
});
var frames = result.Received;
Assert.Equal(1, frames.Count);
var frame = frames[0];
Assert.Equal(endOfMessage, frame.EndOfMessage);
Assert.Equal(expectedOpcode, frame.Opcode);
payloadAssert(frame.Payload.ToArray());
}
private static async Task<WebSocketConnectionSummary> RunReceiveTest(Func<IWritableChannel, CancellationToken, Task> producer)
{
using (var factory = new ChannelFactory())
{
var outbound = factory.CreateChannel();
var inbound = factory.CreateChannel();
var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
// Timeout for the test, but only if the debugger is not attached.
if (!Debugger.IsAttached)
{
cts.CancelAfter(TimeSpan.FromSeconds(5));
}
var producerTask = Task.Run(async () =>
{
await producer(inbound, cancellationToken);
inbound.CompleteWriter();
}, cancellationToken);
var consumerTask = Task.Run(async () =>
{
var connection = new WebSocketConnection(inbound, outbound);
using (cancellationToken.Register(() => connection.Dispose()))
using (connection)
{
// Receive frames until we're closed
return await connection.ExecuteAndCaptureFramesAsync();
}
}, cancellationToken);
await Task.WhenAll(producerTask, consumerTask);
return consumerTask.Result;
}
}
}
private static WebSocketFrame CreateTextFrame(string message)
{
var payload = Encoding.UTF8.GetBytes(message);
return CreateFrame(endOfMessage: true, opcode: WebSocketOpcode.Text, payload: payload);
}
private static WebSocketFrame CreateBinaryFrame(byte[] payload)
{
return CreateFrame(endOfMessage: true, opcode: WebSocketOpcode.Binary, payload: payload);
}
private static WebSocketFrame CreateFrame(bool endOfMessage, WebSocketOpcode opcode, byte[] payload)
{
return new WebSocketFrame(endOfMessage, opcode, payload: ReadableBuffer.Create(payload, 0, payload.Length));
}
}
}

View File

@ -0,0 +1,225 @@
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Channels;
using Xunit;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
public partial class WebSocketConnectionTests
{
public class TheSendAsyncMethod
{
[Theory]
[InlineData("", true, new byte[] { 0x11, 0x00 })]
[InlineData("Hello", true, new byte[] { 0x11, 0x0A, 0x48, 0x65, 0x6C, 0x6C, 0x6F })]
[InlineData("", false, new byte[] { 0x10, 0x00 })]
[InlineData("Hello", false, new byte[] { 0x10, 0x0A, 0x48, 0x65, 0x6C, 0x6C, 0x6F })]
public async Task WriteTextFrames(string message, bool endOfMessage, byte[] expectedRawFrame)
{
var data = await RunSendTest(
producer: async (socket, cancellationToken) =>
{
var payload = Encoding.UTF8.GetBytes(message);
await socket.SendAsync(CreateFrame(
endOfMessage,
opcode: WebSocketOpcode.Text,
payload: payload));
}, masked: false);
Assert.Equal(expectedRawFrame, data);
}
[Theory]
// Opcode = Binary
[InlineData(new byte[0], WebSocketOpcode.Binary, true, new byte[] { 0x21, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Binary, true, new byte[] { 0x21, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
[InlineData(new byte[0], WebSocketOpcode.Binary, false, new byte[] { 0x20, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Binary, false, new byte[] { 0x20, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
// Opcode = Continuation
[InlineData(new byte[0], WebSocketOpcode.Continuation, true, new byte[] { 0x01, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Continuation, true, new byte[] { 0x01, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
[InlineData(new byte[0], WebSocketOpcode.Continuation, false, new byte[] { 0x00, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Continuation, false, new byte[] { 0x00, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
// Opcode = Ping
[InlineData(new byte[0], WebSocketOpcode.Ping, true, new byte[] { 0x91, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Ping, true, new byte[] { 0x91, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
[InlineData(new byte[0], WebSocketOpcode.Ping, false, new byte[] { 0x90, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Ping, false, new byte[] { 0x90, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
// Opcode = Pong
[InlineData(new byte[0], WebSocketOpcode.Pong, true, new byte[] { 0xA1, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Pong, true, new byte[] { 0xA1, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
[InlineData(new byte[0], WebSocketOpcode.Pong, false, new byte[] { 0xA0, 0x00 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Pong, false, new byte[] { 0xA0, 0x0A, 0xA, 0xB, 0xC, 0xD, 0xE })]
public async Task WriteBinaryFormattedFrames(byte[] payload, WebSocketOpcode opcode, bool endOfMessage, byte[] expectedRawFrame)
{
var data = await RunSendTest(
producer: async (socket, cancellationToken) =>
{
await socket.SendAsync(CreateFrame(
endOfMessage,
opcode,
payload: payload));
}, masked: false);
Assert.Equal(expectedRawFrame, data);
}
[Theory]
[InlineData("", new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x11, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData("Hello", new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x11, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x48 ^ 0x01, 0x65 ^ 0x02, 0x6C ^ 0x03, 0x6C ^ 0x04, 0x6F ^ 0x01 })]
public async Task WriteMaskedTextFrames(string message, byte[] maskingKey, byte[] expectedRawFrame)
{
var data = await RunSendTest(
producer: async (socket, cancellationToken) =>
{
var payload = Encoding.UTF8.GetBytes(message);
await socket.SendAsync(CreateFrame(
endOfMessage: true,
opcode: WebSocketOpcode.Text,
payload: payload));
}, maskingKey: maskingKey);
Assert.Equal(expectedRawFrame, data);
}
[Theory]
// Opcode = Binary
[InlineData(new byte[0], WebSocketOpcode.Binary, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x21, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Binary, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x21, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
[InlineData(new byte[0], WebSocketOpcode.Binary, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x20, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Binary, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x20, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
// Opcode = Continuation
[InlineData(new byte[0], WebSocketOpcode.Continuation, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x01, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Continuation, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x01, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
[InlineData(new byte[0], WebSocketOpcode.Continuation, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x00, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Continuation, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x00, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
// Opcode = Ping
[InlineData(new byte[0], WebSocketOpcode.Ping, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x91, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Ping, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x91, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
[InlineData(new byte[0], WebSocketOpcode.Ping, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x90, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Ping, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x90, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
// Opcode = Pong
[InlineData(new byte[0], WebSocketOpcode.Pong, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0xA1, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Pong, true, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0xA1, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
[InlineData(new byte[0], WebSocketOpcode.Pong, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0xA0, 0x01, 0x01, 0x02, 0x03, 0x04 })]
[InlineData(new byte[] { 0xA, 0xB, 0xC, 0xD, 0xE }, WebSocketOpcode.Pong, false, new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0xA0, 0x0B, 0x01, 0x02, 0x03, 0x04, 0x0A ^ 0x01, 0x0B ^ 0x02, 0x0C ^ 0x03, 0x0D ^ 0x04, 0x0E ^ 0x01 })]
public async Task WriteMaskedBinaryFormattedFrames(byte[] payload, WebSocketOpcode opcode, bool endOfMessage, byte[] maskingKey, byte[] expectedRawFrame)
{
var data = await RunSendTest(
producer: async (socket, cancellationToken) =>
{
await socket.SendAsync(CreateFrame(
endOfMessage,
opcode,
payload: payload));
}, maskingKey: maskingKey);
Assert.Equal(expectedRawFrame, data);
}
[Fact]
public async Task WriteRandomMaskedFrame()
{
var data = await RunSendTest(
producer: async (socket, cancellationToken) =>
{
await socket.SendAsync(CreateFrame(
endOfMessage: true,
opcode: WebSocketOpcode.Binary,
payload: new byte[] { 0x0A, 0x0B, 0x0C, 0x0D, 0x0E }));
}, masked: true);
// Verify the header
Assert.Equal(0x21, data[0]);
Assert.Equal(0x0B, data[1]);
// We don't know the mask, so we have to read it in order to verify this frame
var mask = data.Slice(2, 4);
var actualPayload = data.Slice(6);
// Unmask the payload
for (int i = 0; i < actualPayload.Length; i++)
{
actualPayload[i] = (byte)(mask[i % 4] ^ actualPayload[i]);
}
Assert.Equal(new byte[] { 0x0A, 0x0B, 0x0C, 0x0D, 0x0E }, actualPayload.ToArray());
}
[Theory]
[InlineData(WebSocketCloseStatus.MandatoryExtension, "Hi", null, new byte[] { 0x81, 0x08, 0x03, 0xF2, (byte)'H', (byte)'i' })]
[InlineData(WebSocketCloseStatus.PolicyViolation, "", null, new byte[] { 0x81, 0x04, 0x03, 0xF0 })]
[InlineData(WebSocketCloseStatus.MandatoryExtension, "Hi", new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x81, 0x09, 0x01, 0x02, 0x03, 0x04, 0x03 ^ 0x01, 0xF2 ^ 0x02, (byte)'H' ^ 0x03, (byte)'i' ^ 0x04 })]
[InlineData(WebSocketCloseStatus.PolicyViolation, "", new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[] { 0x81, 0x05, 0x01, 0x02, 0x03, 0x04, 0x03 ^ 0x01, 0xF0 ^ 0x02 })]
public async Task WriteCloseFrames(WebSocketCloseStatus status, string description, byte[] maskingKey, byte[] expectedRawFrame)
{
var data = await RunSendTest(
producer: async (socket, cancellationToken) =>
{
await socket.CloseAsync(new WebSocketCloseResult(status, description));
}, maskingKey: maskingKey);
Assert.Equal(expectedRawFrame, data);
}
private static async Task<byte[]> RunSendTest(Func<WebSocketConnection, CancellationToken, Task> producer, bool masked = false, byte[] maskingKey = null)
{
using (var factory = new ChannelFactory())
{
var outbound = factory.CreateChannel();
var inbound = factory.CreateChannel();
var cts = new CancellationTokenSource();
// Timeout for the test, but only if the debugger is not attached.
if (!Debugger.IsAttached)
{
cts.CancelAfter(TimeSpan.FromSeconds(5));
}
var cancellationToken = cts.Token;
using (cancellationToken.Register(() => CompleteChannels(inbound, outbound)))
{
Task executeTask;
using (var connection = CreateConnection(inbound, outbound, masked, maskingKey))
{
executeTask = connection.ExecuteAsync(f =>
{
Assert.False(true, "Did not expect to receive any messages");
return Task.CompletedTask;
});
await producer(connection, cancellationToken);
inbound.CompleteWriter();
await executeTask;
}
var data = (await outbound.ReadToEndAsync()).ToArray();
inbound.CompleteReader();
CompleteChannels(outbound);
return data;
}
}
}
private static void CompleteChannels(params Channel[] channels)
{
foreach (var channel in channels)
{
channel.CompleteReader();
channel.CompleteWriter();
}
}
private static WebSocketConnection CreateConnection(Channel inbound, Channel outbound, bool masked, byte[] maskingKey)
{
return (maskingKey != null) ?
new WebSocketConnection(inbound, outbound, fixedMaskingKey: maskingKey) :
new WebSocketConnection(inbound, outbound, masked);
}
}
}
}

View File

@ -0,0 +1,50 @@
using System;
using Channels;
namespace Microsoft.Extensions.WebSockets.Internal.Tests
{
internal class WebSocketPair : IDisposable
{
private ChannelFactory _factory;
private Channel _serverToClient;
private Channel _clientToServer;
public IWebSocketConnection ClientSocket { get; }
public IWebSocketConnection ServerSocket { get; }
public WebSocketPair(ChannelFactory factory, Channel serverToClient, Channel clientToServer, IWebSocketConnection clientSocket, IWebSocketConnection serverSocket)
{
_factory = factory;
_serverToClient = serverToClient;
_clientToServer = clientToServer;
ClientSocket = clientSocket;
ServerSocket = serverSocket;
}
public static WebSocketPair Create()
{
// Create channels
var factory = new ChannelFactory();
var serverToClient = factory.CreateChannel();
var clientToServer = factory.CreateChannel();
var serverSocket = new WebSocketConnection(clientToServer, serverToClient, masked: true);
var clientSocket = new WebSocketConnection(serverToClient, clientToServer, masked: false);
return new WebSocketPair(factory, serverToClient, clientToServer, clientSocket, serverSocket);
}
public void Dispose()
{
_factory.Dispose();
ServerSocket.Dispose();
ClientSocket.Dispose();
}
public void TerminateFromClient(Exception ex = null)
{
_clientToServer.CompleteWriter(ex);
}
}
}

View File

@ -0,0 +1,30 @@
{
"buildOptions": {
"warningsAsErrors": true
},
"dependencies": {
"dotnet-test-xunit": "1.0.0-rc3-000000-01",
"Microsoft.Extensions.WebSockets.Internal": "0.1.0-*",
"xunit": "2.1.0"
},
"testRunner": "xunit",
"frameworks": {
"netcoreapp1.0": {
"dependencies": {
"Microsoft.NETCore.App": {
"version": "1.0.0",
"type": "platform"
}
},
"imports": [
"dnxcore50",
"portable-net451+win8"
]
},
"net46": {
"dependencies": {
"xunit.runner.console": "2.1.0"
}
}
}
}