React to CoreFxLab packages (#998)

This commit is contained in:
BrennanConroy 2017-11-13 15:05:35 -08:00 committed by Pavel Krymets
parent f21d107766
commit 792745ad98
76 changed files with 766 additions and 566 deletions

View File

@ -11,6 +11,11 @@
<SignAssembly>true</SignAssembly>
<PublicSign Condition="'$(OS)' != 'Windows_NT'">true</PublicSign>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<!-- This is an experimental version of the compiler. See https://github.com/dotnet/csharplang/issues/666 for more details. -->
<PackageReference Include="Microsoft.NETCore.Compilers" Version="$(MicrosoftNETCoreCompilersPackageVersion)" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -9,8 +9,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
public class MessageParserBenchmark
{
private static readonly Random Random = new Random();
private ReadOnlyBuffer<byte> _binaryInput;
private ReadOnlyBuffer<byte> _textInput;
private ReadOnlyMemory<byte> _binaryInput;
private ReadOnlyMemory<byte> _textInput;
[Params(32, 64)]
public int ChunkSize { get; set; }

View File

@ -50,21 +50,23 @@
<MicrosoftExtensionsOptionsPackageVersion>2.1.0-preview1-27475</MicrosoftExtensionsOptionsPackageVersion>
<MicrosoftExtensionsSecurityHelperSourcesPackageVersion>2.1.0-preview1-27475</MicrosoftExtensionsSecurityHelperSourcesPackageVersion>
<MicrosoftNETCoreApp20PackageVersion>2.0.0</MicrosoftNETCoreApp20PackageVersion>
<MicrosoftNETCoreCompilersPackageVersion>2.6.0-beta2-62211-02</MicrosoftNETCoreCompilersPackageVersion>
<MicrosoftNETTestSdkPackageVersion>15.3.0</MicrosoftNETTestSdkPackageVersion>
<MoqPackageVersion>4.7.49</MoqPackageVersion>
<MsgPackCliPackageVersion>0.9.0-beta2</MsgPackCliPackageVersion>
<NewtonsoftJsonPackageVersion>10.0.1</NewtonsoftJsonPackageVersion>
<StackExchangeRedisStrongNamePackageVersion>1.2.4</StackExchangeRedisStrongNamePackageVersion>
<SystemBinaryPackageVersion>0.1.0-e170811-6</SystemBinaryPackageVersion>
<SystemBuffersPrimitivesPackageVersion>0.1.0-e170811-6</SystemBuffersPrimitivesPackageVersion>
<SystemIOPipelinesExtensionsPackageVersion>0.1.0-e170811-6</SystemIOPipelinesExtensionsPackageVersion>
<SystemIOPipelinesPackageVersion>0.1.0-e170811-6</SystemIOPipelinesPackageVersion>
<SystemMemoryPackageVersion>4.4.0-preview3-25519-03</SystemMemoryPackageVersion>
<SystemNumericsVectorsPackageVersion>4.4.0</SystemNumericsVectorsPackageVersion>
<SystemBinaryPackageVersion>0.1.0-alpha-002</SystemBinaryPackageVersion>
<SystemBuffersPrimitivesPackageVersion>0.1.0-alpha-002</SystemBuffersPrimitivesPackageVersion>
<SystemIOPipelinesExtensionsPackageVersion>0.1.0-alpha-002</SystemIOPipelinesExtensionsPackageVersion>
<SystemIOPipelinesPackageVersion>0.1.0-alpha-002</SystemIOPipelinesPackageVersion>
<SystemMemoryPackageVersion>4.5.0-preview1-25902-08</SystemMemoryPackageVersion>
<SystemNumericsVectorsPackageVersion>4.5.0-preview1-25902-08</SystemNumericsVectorsPackageVersion>
<SystemReactiveLinqPackageVersion>3.1.1</SystemReactiveLinqPackageVersion>
<SystemReflectionEmitPackageVersion>4.3.0</SystemReflectionEmitPackageVersion>
<SystemRuntimeCompilerServicesUnsafePackageVersion>4.4.0</SystemRuntimeCompilerServicesUnsafePackageVersion>
<SystemThreadingTasksChannelsPackageVersion>0.1.0-e170811-6</SystemThreadingTasksChannelsPackageVersion>
<SystemRuntimeCompilerServicesUnsafePackageVersion>4.5.0-preview1-25902-08</SystemRuntimeCompilerServicesUnsafePackageVersion>
<SystemThreadingChannelsPackageVersion>4.5.0-preview1-25902-08</SystemThreadingChannelsPackageVersion>
<SystemThreadingTasksExtensionsPackageVersion>4.4.0</SystemThreadingTasksExtensionsPackageVersion>
<XunitPackageVersion>2.3.0</XunitPackageVersion>
<XunitRunnerVisualStudioPackageVersion>2.3.0</XunitRunnerVisualStudioPackageVersion>
</PropertyGroup>

View File

@ -1,8 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.Sockets;
namespace Microsoft.AspNetCore.SignalR.Test.Server
@ -11,7 +12,7 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
{
public async override Task OnConnectedAsync(ConnectionContext connection)
{
await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
}
}
}

View File

@ -40,7 +40,7 @@ namespace SocialWeather
var ms = new MemoryStream();
await formatter.WriteAsync(data, ms);
connection.Transport.Out.TryWrite(ms.ToArray());
connection.Transport.Writer.TryWrite(ms.ToArray());
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
@ -34,9 +34,9 @@ namespace SocialWeather
var formatter = _formatterResolver.GetFormatter<WeatherReport>(
(string)connection.Metadata["formatType"]);
while (await connection.Transport.In.WaitToReadAsync())
while (await connection.Transport.Reader.WaitToReadAsync())
{
if (connection.Transport.In.TryRead(out var buffer))
if (connection.Transport.Reader.TryRead(out var buffer))
{
var stream = new MemoryStream();
await stream.WriteAsync(buffer, 0, buffer.Length);

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
@ -20,9 +20,9 @@ namespace SocketsSample.EndPoints
try
{
while (await connection.Transport.In.WaitToReadAsync())
while (await connection.Transport.Reader.WaitToReadAsync())
{
if (connection.Transport.In.TryRead(out var buffer))
if (connection.Transport.Reader.TryRead(out var buffer))
{
// We can avoid the copy here but we'll deal with that later
var text = Encoding.UTF8.GetString(buffer);
@ -50,7 +50,7 @@ namespace SocketsSample.EndPoints
foreach (var c in Connections)
{
tasks.Add(c.Transport.Out.WriteAsync(payload));
tasks.Add(c.Transport.Writer.WriteAsync(payload));
}
return Task.WhenAll(tasks);

View File

@ -1,7 +1,7 @@
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR;
namespace SocketsSample.Hubs
@ -15,7 +15,7 @@ namespace SocketsSample.Hubs
.Take(count);
}
public ReadableChannel<int> ChannelCounter(int count, int delay)
public ChannelReader<int> ChannelCounter(int count, int delay)
{
var channel = Channel.CreateUnbounded<int>();
@ -23,14 +23,14 @@ namespace SocketsSample.Hubs
{
for (var i = 0; i < count; i++)
{
await channel.Out.WriteAsync(i);
await channel.Writer.WriteAsync(i);
await Task.Delay(delay);
}
channel.Out.TryComplete();
channel.Writer.TryComplete();
});
return channel.In;
return channel.Reader;
}
}
}

View File

@ -8,7 +8,7 @@ using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Client.Internal;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Encoders;
@ -145,12 +145,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
return new Subscription(invocationHandler, invocationList);
}
public async Task<ReadableChannel<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
public async Task<ChannelReader<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
{
return await StreamAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
}
private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
private async Task<ChannelReader<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
if (!_startCalled)
{

View File

@ -1,71 +1,71 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.SignalR.Client
{
public static partial class HubConnectionExtensions
{
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
}
public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
{
return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
}
public static async Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
public static async Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (hubConnection == null)
{
@ -85,9 +85,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
while (inputChannel.TryRead(out var item))
{
while (!outputChannel.Out.TryWrite((TResult)item))
while (!outputChannel.Writer.TryWrite((TResult)item))
{
if (!await outputChannel.Out.WaitToWriteAsync())
if (!await outputChannel.Writer.WaitToWriteAsync())
{
// Failed to write to the output channel because it was closed. Nothing really we can do but abort here.
return;
@ -101,18 +101,18 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
catch (Exception ex)
{
outputChannel.Out.TryComplete(ex);
outputChannel.Writer.TryComplete(ex);
}
finally
{
// This will safely no-op if the catch block above ran.
outputChannel.Out.TryComplete();
outputChannel.Writer.TryComplete();
}
}
_ = RunChannel();
return outputChannel.In;
return outputChannel.Reader;
}
}
}

View File

@ -4,7 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.SignalR.Client
{

View File

@ -4,7 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Client.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.Extensions.Logging;
@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
}
public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId,
ILoggerFactory loggerFactory, HubConnection hubConnection, out ReadableChannel<object> result)
ILoggerFactory loggerFactory, HubConnection hubConnection, out ChannelReader<object> result)
{
var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory, hubConnection);
result = req.Result;
@ -75,7 +75,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
}
public ReadableChannel<object> Result => _channel.In;
public ChannelReader<object> Result => _channel.Reader;
public override void Complete(CompletionMessage completionMessage)
{
@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
if (completionMessage.Result != null)
{
Logger.ReceivedUnexpectedComplete(InvocationId);
_channel.Out.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
_channel.Writer.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
}
if (!string.IsNullOrEmpty(completionMessage.Error))
@ -92,22 +92,22 @@ namespace Microsoft.AspNetCore.SignalR.Client
return;
}
_channel.Out.TryComplete();
_channel.Writer.TryComplete();
}
public override void Fail(Exception exception)
{
Logger.InvocationFailed(InvocationId);
_channel.Out.TryComplete(exception);
_channel.Writer.TryComplete(exception);
}
public override async ValueTask<bool> StreamItem(object item)
{
try
{
while (!_channel.Out.TryWrite(item))
while (!_channel.Writer.TryWrite(item))
{
if (!await _channel.Out.WaitToWriteAsync())
if (!await _channel.Writer.WaitToWriteAsync())
{
return false;
}
@ -122,7 +122,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
protected override void Cancel()
{
_channel.Out.TryComplete(new OperationCanceledException("Invocation terminated"));
_channel.Writer.TryComplete(new OperationCanceledException("Invocation terminated"));
}
}

View File

@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Encoders
{
public byte[] Decode(byte[] payload)
{
var buffer = new ReadOnlyBuffer<byte>(payload);
var buffer = new ReadOnlyMemory<byte>(payload);
LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message);
return Convert.FromBase64String(Encoding.UTF8.GetString(message.ToArray()));

View File

@ -14,20 +14,18 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Encoders
/// Attempts to parse a message from the buffer. Returns 'false' if there is not enough data to complete a message. Throws an
/// exception if there is a format error in the provided data.
/// </summary>
public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
{
payload = default;
var span = buffer.Span;
payload = default(ReadOnlyMemory<byte>);
if (!TryReadLength(span, out var index, out var length))
if (!TryReadLength(buffer.Span, out var index, out var length))
{
return false;
}
var remaining = buffer.Slice(index);
span = remaining.Span;
if (!TryReadDelimiter(span, LengthPrefixedTextMessageWriter.FieldDelimiter, "length"))
if (!TryReadDelimiter(remaining.Span, LengthPrefixedTextMessageWriter.FieldDelimiter, "length"))
{
return false;
}

View File

@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
var buffer = ArrayPool<byte>.Shared.Rent(lenNumBytes + payload.Length);
var bufferSpan = buffer.AsSpan();
new Span<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
new ReadOnlySpan<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
bufferSpan = bufferSpan.Slice(lenNumBytes);
payload.CopyTo(bufferSpan);
output.Write(buffer, 0, lenNumBytes + payload.Length);

View File

@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
private static int[] _numBitsToShift = new[] { 0, 7, 14, 21, 28 };
private const int MaxLengthPrefixSize = 5;
public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
{
payload = default;

View File

@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
{
public static class TextMessageParser
{
public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
{
payload = default;

View File

@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
ProtocolType Type { get; }
bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
void WriteMessage(HubMessage message, Stream output);
}

View File

@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
public ProtocolType Type => ProtocolType.Text;
public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();

View File

@ -38,7 +38,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
_serializationContext = serializationContext;
}
public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();

View File

@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
}
}
public static bool TryParseMessage(ReadOnlyBuffer<byte> input, out NegotiationMessage negotiationMessage)
public static bool TryParseMessage(ReadOnlyMemory<byte> input, out NegotiationMessage negotiationMessage)
{
if (!TextMessageParser.TryParseMessage(ref input, out var payload))
{

View File

@ -10,7 +10,6 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
<PackageReference Include="MsgPack.Cli" Version="$(MsgPackCliPackageVersion)" />
<PackageReference Include="System.Binary" Version="$(SystemBinaryPackageVersion)" />
<PackageReference Include="System.Buffers.Primitives" Version="$(SystemBuffersPrimitivesPackageVersion)" />
<PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
<PackageReference Include="System.Numerics.Vectors" Version="$(SystemNumericsVectorsPackageVersion)" />

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -8,7 +8,7 @@ using System.Runtime.ExceptionServices;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.SignalR.Features;
using Microsoft.AspNetCore.SignalR.Internal;
@ -22,12 +22,12 @@ namespace Microsoft.AspNetCore.SignalR
{
private static Action<object> _abortedCallback = AbortConnection;
private readonly WritableChannel<HubMessage> _output;
private readonly ChannelWriter<HubMessage> _output;
private readonly ConnectionContext _connectionContext;
private readonly CancellationTokenSource _connectionAbortedTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<object> _abortCompletedTcs = new TaskCompletionSource<object>();
public HubConnectionContext(WritableChannel<HubMessage> output, ConnectionContext connectionContext)
public HubConnectionContext(ChannelWriter<HubMessage> output, ConnectionContext connectionContext)
{
_output = output;
_connectionContext = connectionContext;
@ -37,7 +37,7 @@ namespace Microsoft.AspNetCore.SignalR
private IHubFeature HubFeature => Features.Get<IHubFeature>();
// Used by the HubEndPoint only
internal ReadableChannel<byte[]> Input => _connectionContext.Transport;
internal ChannelReader<byte[]> Input => _connectionContext.Transport;
internal ExceptionDispatchInfo AbortException { get; private set; }
@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.SignalR
public virtual HubProtocolReaderWriter ProtocolReaderWriter { get; set; }
public virtual WritableChannel<HubMessage> Output => _output;
public virtual ChannelWriter<HubMessage> Output => _output;
// Currently used only for streaming methods
internal ConcurrentDictionary<string, CancellationTokenSource> ActiveRequestCancellationSources { get; } = new ConcurrentDictionary<string, CancellationTokenSource>();

View File

@ -9,7 +9,7 @@ using System.Reflection;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR.Core;
using Microsoft.AspNetCore.SignalR.Core.Internal;
@ -84,14 +84,14 @@ namespace Microsoft.AspNetCore.SignalR
{
try
{
while (await output.In.WaitToReadAsync())
while (await output.Reader.WaitToReadAsync())
{
while (output.In.TryRead(out var hubMessage))
while (output.Reader.TryRead(out var hubMessage))
{
var buffer = protocolReaderWriter.WriteMessage(hubMessage);
while (await connection.Transport.Out.WaitToWriteAsync())
while (await connection.Transport.Writer.WaitToWriteAsync())
{
if (connection.Transport.Out.TryWrite(buffer))
if (connection.Transport.Writer.TryWrite(buffer))
{
break;
}
@ -117,7 +117,7 @@ namespace Microsoft.AspNetCore.SignalR
await _lifetimeManager.OnDisconnectedAsync(connectionContext);
// Nothing should be writing to the HubConnectionContext
output.Out.TryComplete();
output.Writer.TryComplete();
// This should unwind once we complete the output
await writingOutputTask;
@ -461,7 +461,7 @@ namespace Microsoft.AspNetCore.SignalR
private static bool IsChannel(Type type, out Type payloadType)
{
var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ReadableChannel<>));
var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ChannelReader<>));
if (channelType == null)
{
payloadType = null;

View File

@ -6,7 +6,7 @@ using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.SignalR.Internal
{
@ -21,6 +21,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(FromObservable)) && m.IsGenericMethod);
private static readonly MethodInfo _getAsyncEnumeratorMethod = typeof(AsyncEnumeratorAdapters)
.GetRuntimeMethods()
.Single(m => m.Name.Equals(nameof(GetAsyncEnumerator)) && m.IsGenericMethod);
public static IAsyncEnumerator<object> FromObservable(object observable, Type observableInterface, CancellationToken cancellationToken)
{
// TODO: Cache expressions by observable.GetType()?
@ -34,20 +38,19 @@ namespace Microsoft.AspNetCore.SignalR.Internal
// TODO: Allow bounding and optimizations?
var channel = Channel.CreateUnbounded<object>();
var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Out, cancellationToken));
var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Writer, cancellationToken));
// Dispose the subscription when the token is cancelled
cancellationToken.Register(state => ((IDisposable)state).Dispose(), subscription);
return channel.In.GetAsyncEnumerator(cancellationToken);
return GetAsyncEnumerator(channel.Reader, cancellationToken);
}
public static IAsyncEnumerator<object> FromChannel(object readableChannelOfT, Type payloadType, CancellationToken cancellationToken)
{
var enumerator = readableChannelOfT
.GetType()
.GetRuntimeMethod("GetAsyncEnumerator", new[] { typeof(CancellationToken) })
.Invoke(readableChannelOfT, new object[] { cancellationToken });
var enumerator = _getAsyncEnumeratorMethod
.MakeGenericMethod(payloadType)
.Invoke(null, new object[] { readableChannelOfT, cancellationToken });
if (payloadType.IsValueType)
{
@ -68,10 +71,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
private class ChannelObserver<T> : IObserver<T>
{
private WritableChannel<object> _output;
private ChannelWriter<object> _output;
private CancellationToken _cancellationToken;
public ChannelObserver(WritableChannel<object> output, CancellationToken cancellationToken)
public ChannelObserver(ChannelWriter<object> output, CancellationToken cancellationToken)
{
_output = output;
_cancellationToken = cancellationToken;
@ -125,5 +128,66 @@ namespace Microsoft.AspNetCore.SignalR.Internal
public object Current => _input.Current;
public Task<bool> MoveNextAsync() => _input.MoveNextAsync();
}
public static IAsyncEnumerator<T> GetAsyncEnumerator<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default(CancellationToken))
{
return new AsyncEnumerator<T>(channel, cancellationToken);
}
/// <summary>Provides an async enumerator for the data in a channel.</summary>
internal class AsyncEnumerator<T> : IAsyncEnumerator<T>
{
/// <summary>The channel being enumerated.</summary>
private readonly ChannelReader<T> _channel;
/// <summary>Cancellation token used to cancel the enumeration.</summary>
private readonly CancellationToken _cancellationToken;
/// <summary>The current element of the enumeration.</summary>
private T _current;
internal AsyncEnumerator(ChannelReader<T> channel, CancellationToken cancellationToken)
{
_channel = channel;
_cancellationToken = cancellationToken;
}
public T Current => _current;
public Task<bool> MoveNextAsync()
{
ValueTask<T> result = _channel.ReadAsync(_cancellationToken);
if (result.IsCompletedSuccessfully)
{
_current = result.Result;
return Task.FromResult(true);
}
return result.AsTask().ContinueWith((t, s) =>
{
var thisRef = (AsyncEnumerator<T>)s;
if (t.IsFaulted && t.Exception.InnerException is ChannelClosedException cce && cce.InnerException == null)
{
return false;
}
thisRef._current = t.GetAwaiter().GetResult();
return true;
}, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
}
}
}
/// <summary>Represents an enumerator accessed asynchronously.</summary>
/// <typeparam name="T">Specifies the type of the data enumerated.</typeparam>
internal interface IAsyncEnumerator<out T>
{
/// <summary>Asynchronously move the enumerator to the next element.</summary>
/// <returns>
/// A task that returns true if the enumerator was successfully advanced to the next item,
/// or false if no more data was available in the collection.
/// </returns>
Task<bool> MoveNextAsync();
/// <summary>Gets the current element being enumerated.</summary>
T Current { get; }
}
}

View File

@ -1,8 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.Sockets.Internal
{
@ -24,20 +24,19 @@ namespace Microsoft.AspNetCore.Sockets.Internal
public Channel<T> Input { get; }
public Channel<T> Output { get; }
public override ReadableChannel<T> In => Input;
public override WritableChannel<T> Out => Output;
public ChannelConnection(Channel<T> input, Channel<T> output)
{
Reader = input.Reader;
Input = input;
Writer = output.Writer;
Output = output;
}
public void Dispose()
{
Input.Out.TryComplete();
Output.Out.TryComplete();
Input.Writer.TryComplete();
Output.Writer.TryComplete();
}
}
@ -46,20 +45,19 @@ namespace Microsoft.AspNetCore.Sockets.Internal
public Channel<TIn> Input { get; }
public Channel<TOut> Output { get; }
public override ReadableChannel<TIn> In => Input;
public override WritableChannel<TOut> Out => Output;
public ChannelConnection(Channel<TIn> input, Channel<TOut> output)
{
Reader = input.Reader;
Input = input;
Writer = output.Writer;
Output = output;
}
public void Dispose()
{
Input.Out.TryComplete();
Output.Out.TryComplete();
Input.Writer.TryComplete();
Output.Writer.TryComplete();
}
}
}

View File

@ -0,0 +1,47 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.SignalR.Internal
{
public static class ChannelReaderExtensions
{
/// <summary>Asynchronously reads an item from the channel.</summary>
/// <param name="channel">The channel</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the read operation.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous read operation.</returns>
public static ValueTask<T> ReadAsync<T>(this ChannelReader<T> channel, CancellationToken cancellationToken = default)
{
try
{
return
cancellationToken.IsCancellationRequested
? new ValueTask<T>(Task.FromCanceled<T>(cancellationToken))
: channel.TryRead(out T item)
? new ValueTask<T>(item)
: ReadAsyncCore(cancellationToken);
}
catch (Exception e)
{
return new ValueTask<T>(Task.FromException<T>(e));
}
async ValueTask<T> ReadAsyncCore(CancellationToken ct)
{
while (await channel.WaitToReadAsync(ct).ConfigureAwait(false))
{
if (channel.TryRead(out T item))
{
return item;
}
}
throw new ChannelClosedException();
}
}
}
}

View File

@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Sockets

View File

@ -1,7 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.Sockets.Features
{

View File

@ -7,7 +7,8 @@
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="$(MicrosoftAspNetCoreHttpFeaturesPackageVersion)" />
<PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
<PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -8,7 +8,7 @@ using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Client.Internal;
@ -39,8 +39,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
private readonly ITransportFactory _transportFactory;
private string _connectionId;
private readonly TimeSpan _eventQueueDrainTimeout = TimeSpan.FromSeconds(5);
private ReadableChannel<byte[]> Input => _transportChannel.In;
private WritableChannel<SendMessage> Output => _transportChannel.Out;
private ChannelReader<byte[]> Input => _transportChannel.Input;
private ChannelWriter<SendMessage> Output => _transportChannel.Output;
private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
private readonly TransportType _requestedTransportType = TransportType.All;

View File

@ -1,9 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.Sockets.Client
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -6,7 +6,7 @@ using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Client.Internal;
using Microsoft.Extensions.Logging;
@ -59,7 +59,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
Running = Task.WhenAll(_sender, _poller).ContinueWith(t =>
{
_logger.TransportStopped(_connectionId, t.Exception?.InnerException);
_application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
_application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
@ -123,9 +123,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
var payload = await response.Content.ReadAsByteArrayAsync();
if (payload.Length > 0)
{
while (!_application.Out.TryWrite(payload))
while (!_application.Writer.TryWrite(payload))
{
if (cancellationToken.IsCancellationRequested || !await _application.Out.WaitToWriteAsync(cancellationToken))
if (cancellationToken.IsCancellationRequested || !await _application.Writer.WaitToWriteAsync(cancellationToken))
{
return;
}

View File

@ -22,7 +22,7 @@
<PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
<PackageReference Include="System.Numerics.Vectors" Version="$(SystemNumericsVectorsPackageVersion)" />
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="$(SystemRuntimeCompilerServicesUnsafePackageVersion)" />
<PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
<PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -7,7 +7,7 @@ using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Client.Internal;
using Microsoft.Extensions.Logging;
@ -23,11 +23,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
IList<SendMessage> messages = null;
try
{
while (await application.In.WaitToReadAsync(transportCts.Token))
while (await application.Reader.WaitToReadAsync(transportCts.Token))
{
// Grab as many messages as we can from the channel
messages = new List<SendMessage>();
while (!transportCts.IsCancellationRequested && application.In.TryRead(out SendMessage message))
while (!transportCts.IsCancellationRequested && application.Reader.TryRead(out SendMessage message))
{
messages.Add(message);
}

View File

@ -146,7 +146,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private Span<byte> ConvertBufferToSpan(ReadableBuffer buffer)
private ReadOnlySpan<byte> ConvertBufferToSpan(ReadableBuffer buffer)
{
if (buffer.IsSingleSpan)
{

View File

@ -1,13 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Sockets.Client.Internal;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
using Microsoft.Extensions.Logging;
@ -17,6 +18,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
public class ServerSentEventsTransport : ITransport
{
private static readonly MemoryPool _memoryPool = new MemoryPool();
private readonly HttpClient _httpClient;
private readonly ILogger _logger;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
@ -64,7 +66,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.TransportStopped(_connectionId, t.Exception?.InnerException);
_application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
_application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
@ -80,7 +82,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
var stream = await response.Content.ReadAsStreamAsync();
var pipelineReader = stream.AsPipelineReader(cancellationToken);
var pipelineReader = StreamPipeConnection.CreateReader(new PipeOptions(_memoryPool), stream);
var readCancellationRegistration = cancellationToken.Register(
reader => ((IPipeReader)reader).CancelPendingRead(), pipelineReader);
try
@ -105,7 +107,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
switch (parseResult)
{
case ServerSentEventsMessageParser.ParseResult.Completed:
_application.Out.TryWrite(buffer);
_application.Writer.TryWrite(buffer);
_parser.Reset();
break;
case ServerSentEventsMessageParser.ParseResult.Incomplete:
@ -139,7 +141,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.TransportStopping(_connectionId);
_transportCts.Cancel();
_application.Out.TryComplete();
_application.Writer.TryComplete();
try
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -7,7 +7,7 @@ using System.Diagnostics;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Sockets.Client.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -70,8 +70,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_webSocket.Dispose();
_logger.TransportStopped(_connectionId, t.Exception?.InnerException);
_application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
_application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
return t;
}).Unwrap();
}
@ -97,7 +97,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
_logger.WebSocketClosed(_connectionId, receiveResult.CloseStatus);
_application.Out.Complete(
_application.Writer.Complete(
receiveResult.CloseStatus == WebSocketCloseStatus.NormalClosure
? null
: new InvalidOperationException(
@ -135,9 +135,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
if (!_transportCts.Token.IsCancellationRequested)
{
_logger.MessageToApp(_connectionId, messageBuffer.Length);
while (await _application.Out.WaitToWriteAsync(_transportCts.Token))
while (await _application.Writer.WaitToWriteAsync(_transportCts.Token))
{
if (_application.Out.TryWrite(messageBuffer))
if (_application.Writer.TryWrite(messageBuffer))
{
incomingMessage.Clear();
break;
@ -173,9 +173,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
try
{
while (await _application.In.WaitToReadAsync(_transportCts.Token))
while (await _application.Reader.WaitToReadAsync(_transportCts.Token))
{
while (_application.In.TryRead(out SendMessage message))
while (_application.Reader.TryRead(out SendMessage message))
{
try
{

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -93,7 +93,7 @@ namespace Microsoft.AspNetCore.Sockets
connection.TransportCapabilities = TransferMode.Text;
// We only need to provide the Input channel since writing to the application is handled through /send.
var sse = new ServerSentEventsTransport(connection.Application.In, connection.ConnectionId, _loggerFactory);
var sse = new ServerSentEventsTransport(connection.Application.Reader, connection.ConnectionId, _loggerFactory);
await DoPersistentConnection(socketDelegate, sse, context, connection);
}
@ -194,7 +194,7 @@ namespace Microsoft.AspNetCore.Sockets
context.Response.RegisterForDispose(timeoutSource);
context.Response.RegisterForDispose(tokenSource);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.In, connection.ConnectionId, _loggerFactory);
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Reader, connection.ConnectionId, _loggerFactory);
// Start the transport
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.Sockets
if (resultTask == connection.ApplicationTask)
{
// Complete the transport (notifying it of the application error if there is one)
connection.Transport.Out.TryComplete(connection.ApplicationTask.Exception);
connection.Transport.Writer.TryComplete(connection.ApplicationTask.Exception);
// Wait for the transport to run
await connection.TransportTask;
@ -408,9 +408,9 @@ namespace Microsoft.AspNetCore.Sockets
}
_logger.ReceivedBytes(connection.ConnectionId, buffer.Length);
while (!connection.Application.Out.TryWrite(buffer))
while (!connection.Application.Writer.TryWrite(buffer))
{
if (!await connection.Application.Out.WaitToWriteAsync())
if (!await connection.Application.Writer.WaitToWriteAsync())
{
return;
}

View File

@ -1,11 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
@ -13,12 +13,12 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
public class LongPollingTransport : IHttpTransport
{
private readonly ReadableChannel<byte[]> _application;
private readonly ChannelReader<byte[]> _application;
private readonly ILogger _logger;
private readonly CancellationToken _timeoutToken;
private readonly string _connectionId;
public LongPollingTransport(CancellationToken timeoutToken, ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
public LongPollingTransport(CancellationToken timeoutToken, ChannelReader<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
{
_timeoutToken = timeoutToken;
_application = application;

View File

@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
if (nextSliceStart >= payload.Length)
{
payload = Span<byte>.Empty;
payload = ReadOnlySpan<byte>.Empty;
}
else
{

View File

@ -1,11 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
@ -15,11 +15,11 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
{
public class ServerSentEventsTransport : IHttpTransport
{
private readonly ReadableChannel<byte[]> _application;
private readonly ChannelReader<byte[]> _application;
private readonly string _connectionId;
private readonly ILogger _logger;
public ServerSentEventsTransport(ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
public ServerSentEventsTransport(ChannelReader<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
{
_application = application;
_connectionId = connectionId;

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -7,7 +7,7 @@ using System.Diagnostics;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
@ -87,7 +87,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
}
// We're done writing
_application.Out.TryComplete();
_application.Writer.TryComplete();
await socket.CloseOutputAsync(failed ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
@ -160,9 +160,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
}
_logger.MessageToApplication(_connection.ConnectionId, messageBuffer.Length);
while (await _application.Out.WaitToWriteAsync())
while (await _application.Writer.WaitToWriteAsync())
{
if (_application.Out.TryWrite(messageBuffer))
if (_application.Writer.TryWrite(messageBuffer))
{
incomingMessage.Clear();
break;
@ -173,10 +173,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
private async Task StartSending(WebSocket ws)
{
while (await _application.In.WaitToReadAsync())
while (await _application.Reader.WaitToReadAsync())
{
// Get a frame from the application
while (_application.In.TryRead(out var buffer))
while (_application.Reader.TryRead(out var buffer))
{
if (buffer.Length > 0)
{

View File

@ -16,7 +16,7 @@
<PackageReference Include="Microsoft.AspNetCore.Routing" Version="$(MicrosoftAspNetCoreRoutingPackageVersion)" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="$(MicrosoftAspNetCoreWebSocketsPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.SecurityHelper.Sources" PrivateAssets="All" Version="$(MicrosoftExtensionsSecurityHelperSourcesPackageVersion)" />
<PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
<PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
<PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
</ItemGroup>

View File

@ -9,7 +9,7 @@ using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.Extensions.Logging;

View File

@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Sockets.Features;
@ -86,21 +86,21 @@ namespace Microsoft.AspNetCore.Sockets
// If the application task is faulted, propagate the error to the transport
if (ApplicationTask?.IsFaulted == true)
{
Transport.Out.TryComplete(ApplicationTask.Exception.InnerException);
Transport.Writer.TryComplete(ApplicationTask.Exception.InnerException);
}
else
{
Transport.Out.TryComplete();
Transport.Writer.TryComplete();
}
// If the transport task is faulted, propagate the error to the application
if (TransportTask?.IsFaulted == true)
{
Application.Out.TryComplete(TransportTask.Exception.InnerException);
Application.Writer.TryComplete(TransportTask.Exception.InnerException);
}
else
{
Application.Out.TryComplete();
Application.Writer.TryComplete();
}
var applicationTask = ApplicationTask ?? Task.CompletedTask;

View File

@ -13,7 +13,7 @@
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="$(MicrosoftAspNetCoreHostingAbstractionsPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsLoggingAbstractionsPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="$(MicrosoftExtensionsDependencyInjectionAbstractionsPackageVersion)" />
<PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
<PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,11 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace System.Threading.Tasks.Channels
namespace System.Threading.Channels
{
internal static class ChannelExtensions
{
public static async Task<List<T>> ReadAllAsync<T>(this ReadableChannel<T> channel)
public static async Task<List<T>> ReadAllAsync<T>(this ChannelReader<T> channel)
{
var list = new List<T>();
while (await channel.WaitToReadAsync())

View File

@ -7,7 +7,7 @@ using System.IO;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Encoders;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public TestClient(bool synchronousCallbacks = false, IHubProtocol protocol = null, IInvocationBinder invocationBinder = null, bool addClaimId = false)
{
var options = new ChannelOptimizations { AllowSynchronousContinuations = synchronousCallbacks };
var options = new UnboundedChannelOptions { AllowSynchronousContinuations = synchronousCallbacks };
var transportToApplication = Channel.CreateUnbounded<byte[]>(options);
var applicationToTransport = Channel.CreateUnbounded<byte[]>(options);
@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
using (var memoryStream = new MemoryStream())
{
NegotiationProtocol.WriteMessage(new NegotiationMessage(protocol.Name), memoryStream);
Application.Out.TryWrite(memoryStream.ToArray());
Application.Writer.TryWrite(memoryStream.ToArray());
}
}
@ -149,7 +149,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public async Task<string> SendHubMessageAsync(HubMessage message)
{
var payload = _protocolReaderWriter.WriteMessage(message);
await Application.Out.WriteAsync(payload);
await Application.Writer.WriteAsync(payload);
return message.InvocationId;
}
@ -161,7 +161,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
if (message == null)
{
if (!await Application.In.WaitToReadAsync())
if (!await Application.Reader.WaitToReadAsync())
{
return null;
}
@ -175,7 +175,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public HubMessage TryRead()
{
if (Application.In.TryRead(out var buffer) &&
if (Application.Reader.TryRead(out var buffer) &&
_protocolReaderWriter.ReadMessages(buffer, _invocationBinder, out var messages))
{
return messages[0];
@ -208,4 +208,4 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
}
}
}

View File

@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;

View File

@ -1,11 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
{
@ -17,9 +17,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
public async Task CallEcho(string message)
{
@ -40,9 +40,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
public async Task CallEcho(string message)
{
@ -63,9 +63,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
public async Task CallEcho(string message)
{
@ -97,12 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
.Take(count);
}
public static ReadableChannel<int> StreamException()
public static ChannelReader<int> StreamException()
{
throw new InvalidOperationException("Error occurred while streaming.");
}
public static ReadableChannel<string> StreamBroken() => null;
public static ChannelReader<string> StreamBroken() => null;
}
public interface ITestHub

View File

@ -7,7 +7,7 @@ using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Features;
@ -268,8 +268,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
{
// The connection is now in the Disconnected state so the Received event for
// this message should not be raised
channel.Out.TryWrite(Array.Empty<byte>());
channel.Out.TryComplete();
channel.Writer.TryWrite(Array.Empty<byte>());
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
@ -313,7 +313,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Out.TryComplete();
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
@ -330,14 +330,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
await connection.StartAsync();
channel.Out.TryWrite(Array.Empty<byte>());
channel.Writer.TryWrite(Array.Empty<byte>());
// Ensure that the Received callback has been called before attempting the second write
await callbackInvokedTcs.Task.OrTimeout();
channel.Out.TryWrite(Array.Empty<byte>());
channel.Writer.TryWrite(Array.Empty<byte>());
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.In.TryRead(out var message));
Assert.False(channel.Reader.TryRead(out var message));
closedTcs.SetResult(null);
@ -369,7 +369,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Out.TryComplete();
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
@ -380,10 +380,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
connection.OnReceived(_ => blockReceiveCallbackTcs.Task);
await connection.StartAsync();
channel.Out.TryWrite(Array.Empty<byte>());
channel.Writer.TryWrite(Array.Empty<byte>());
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.In.TryRead(out var message));
Assert.False(channel.Reader.TryRead(out var message));
await connection.DisposeAsync();
}
@ -413,7 +413,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Out.TryComplete();
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
@ -427,10 +427,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
});
await connection.StartAsync();
channel.Out.TryWrite(Array.Empty<byte>());
channel.Writer.TryWrite(Array.Empty<byte>());
// Ensure that SignalR isn't blocked by the receive callback
Assert.False(channel.In.TryRead(out var message));
Assert.False(channel.Reader.TryRead(out var message));
await connection.DisposeAsync();
}
@ -909,7 +909,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
mockTransport.Setup(t => t.StopAsync())
.Returns(() =>
{
channel.Out.TryComplete();
channel.Writer.TryComplete();
return Task.CompletedTask;
});
mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Binary);

View File

@ -6,7 +6,7 @@ using System.Globalization;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;

View File

@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public ProtocolType Type => ProtocolType.Binary;
public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -8,7 +8,7 @@ using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Client.Tests
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
await longPollingTransport.Running.OrTimeout();
Assert.True(transportToConnection.In.Completion.IsCompleted);
Assert.True(transportToConnection.Reader.Completion.IsCompleted);
}
finally
{
@ -135,9 +135,9 @@ namespace Microsoft.AspNetCore.Client.Tests
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
var data = await transportToConnection.In.ReadAllAsync().OrTimeout();
var data = await transportToConnection.Reader.ReadAllAsync().OrTimeout();
await longPollingTransport.Running.OrTimeout();
Assert.True(transportToConnection.In.Completion.IsCompleted);
Assert.True(transportToConnection.Reader.Completion.IsCompleted);
Assert.Equal(2, data.Count);
Assert.Equal(Encoding.UTF8.GetBytes("Hello"), data[0]);
Assert.Equal(Encoding.UTF8.GetBytes("World"), data[1]);
@ -172,7 +172,7 @@ namespace Microsoft.AspNetCore.Client.Tests
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
var exception =
await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion.OrTimeout());
await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion.OrTimeout());
Assert.Contains(" 500 ", exception.Message);
}
finally
@ -207,16 +207,16 @@ namespace Microsoft.AspNetCore.Client.Tests
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
await connectionToTransport.Out.WriteAsync(new SendMessage());
await connectionToTransport.Writer.WriteAsync(new SendMessage());
await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Running.OrTimeout());
// The channel needs to be drained for the Completion task to be completed
while (transportToConnection.In.TryRead(out var message))
while (transportToConnection.Reader.TryRead(out var message))
{
}
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion);
var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion);
Assert.Contains(" 500 ", exception.Message);
}
finally
@ -248,12 +248,12 @@ namespace Microsoft.AspNetCore.Client.Tests
var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
connectionToTransport.Out.Complete();
connectionToTransport.Writer.Complete();
await longPollingTransport.Running.OrTimeout();
await longPollingTransport.Running.OrTimeout();
await connectionToTransport.In.Completion.OrTimeout();
await connectionToTransport.Reader.Completion.OrTimeout();
}
finally
{
@ -304,9 +304,9 @@ namespace Microsoft.AspNetCore.Client.Tests
// Pull Messages out of the channel
var messages = new List<byte[]>();
while (await transportToConnection.In.WaitToReadAsync())
while (await transportToConnection.Reader.WaitToReadAsync())
{
while (transportToConnection.In.TryRead(out var message))
while (transportToConnection.Reader.TryRead(out var message))
{
messages.Add(message);
}
@ -358,16 +358,16 @@ namespace Microsoft.AspNetCore.Client.Tests
var tcs2 = new TaskCompletionSource<object>();
// Pre-queue some messages
await connectionToTransport.Out.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("Hello"), tcs1)).OrTimeout();
await connectionToTransport.Out.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("Hello"), tcs1)).OrTimeout();
await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
// Start the transport
await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
connectionToTransport.Out.Complete();
connectionToTransport.Writer.Complete();
await longPollingTransport.Running.OrTimeout();
await connectionToTransport.In.Completion.OrTimeout();
await connectionToTransport.Reader.Completion.OrTimeout();
Assert.Single(sentRequests);
Assert.Equal(new byte[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o', (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d'

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
@ -106,10 +107,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[InlineData(new[] { "data: Hello, World\r\n", ":comment\r\n", "\r\n" }, "Hello, World")]
public async Task ParseMessageAcrossMultipleReadsSuccess(string[] messageParts, string expectedMessage)
{
using (var pipeFactory = new PipeFactory())
var parser = new ServerSentEventsMessageParser();
using (var pool = new MemoryPool())
{
var parser = new ServerSentEventsMessageParser();
var pipe = pipeFactory.Create();
var pipe = new Pipe(new PipeOptions(pool));
byte[] message = null;
ReadCursor consumed = default, examined = default;
@ -152,9 +153,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[InlineData("data: B\r\ndata: SGVs", "bG8sIFdvcmxk\r\n\n\n", "There was an error in the frame format")]
public async Task ParseMessageAcrossMultipleReadsFailure(string encodedMessagePart1, string encodedMessagePart2, string expectedMessage)
{
using (var pipeFactory = new PipeFactory())
using (var pool = new MemoryPool())
{
var pipe = pipeFactory.Create();
var pipe = new Pipe(new PipeOptions(pool));
// Read the first part of the message
await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(encodedMessagePart1));
@ -173,7 +174,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var ex = Assert.Throws<FormatException>(() => parser.ParseMessage(result.Buffer, out consumed, out examined, out buffer));
Assert.Equal(expectedMessage, ex.Message);
}
}
@ -181,9 +181,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
[InlineData("data: foo\r\n\r\n", "data: bar\r\n\r\n")]
public async Task ParseMultipleMessagesText(string message1, string message2)
{
using (var pipeFactory = new PipeFactory())
using (var pool = new MemoryPool())
{
var pipe = pipeFactory.Create();
var pipe = new Pipe(new PipeOptions(pool));
// Read the first part of the message
await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(message1 + message2));

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -8,8 +8,9 @@ using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Client.Tests;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
@ -42,6 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
});
@ -83,12 +85,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
{
await Task.Yield();
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!eventStreamCts.IsCancellationRequested)
{
await stream.WriteAsync(buffer, 0, buffer.Length);
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
});
@ -109,7 +113,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
transportActiveTask = sseTransport.Running;
Assert.False(transportActiveTask.IsCompleted);
var message = await transportToConnection.In.ReadAsync().AsTask().OrTimeout();
var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
}
finally
@ -140,6 +144,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var buffer = Encoding.ASCII.GetBytes("data: 3:a");
await stream.WriteAsync(buffer, 0, buffer.Length);
});
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
});
@ -182,6 +187,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
}
@ -201,7 +207,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await eventStreamTcs.Task;
var sendTcs = new TaskCompletionSource<object>();
Assert.True(connectionToTransport.Out.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs)));
Assert.True(connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs)));
var exception = await Assert.ThrowsAsync<HttpRequestException>(() => sendTcs.Task.OrTimeout());
Assert.Contains("500", exception.Message);
@ -231,6 +237,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
});
@ -246,7 +253,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
await eventStreamTcs.Task.OrTimeout();
connectionToTransport.Out.TryComplete(null);
connectionToTransport.Writer.TryComplete(null);
await sseTransport.Running.OrTimeout();
}
@ -274,7 +281,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
await sseTransport.StartAsync(
new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
var message = await transportToConnection.In.ReadAsync().AsTask().OrTimeout();
var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
await sseTransport.Running.OrTimeout();

View File

@ -7,8 +7,9 @@ using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Formatters;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
@ -34,8 +35,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public Task Closed => _closeTcs.Task;
public Task Started => _started.Task;
public Task Disposed => _disposed.Task;
public ReadableChannel<byte[]> SentMessages => _sentMessages.In;
public WritableChannel<byte[]> ReceivedMessages => _receivedMessages.Out;
public ChannelReader<byte[]> SentMessages => _sentMessages.Reader;
public ChannelWriter<byte[]> ReceivedMessages => _receivedMessages.Writer;
private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
@ -61,9 +62,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
throw new InvalidOperationException("Connection must be started before SendAsync can be called");
}
while (await _sentMessages.Out.WaitToWriteAsync(cancellationToken))
while (await _sentMessages.Writer.WaitToWriteAsync(cancellationToken))
{
if (_sentMessages.Out.TryWrite(data))
if (_sentMessages.Writer.TryWrite(data))
{
return;
}
@ -100,7 +101,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
var json = JsonConvert.SerializeObject(jsonObject, Formatting.None);
var bytes = FormatMessageToArray(Encoding.UTF8.GetBytes(json));
return _receivedMessages.Out.WriteAsync(bytes);
return _receivedMessages.Writer.WriteAsync(bytes);
}
private byte[] FormatMessageToArray(byte[] message)
@ -116,9 +117,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
{
while (!token.IsCancellationRequested)
{
while (await _receivedMessages.In.WaitToReadAsync(token))
while (await _receivedMessages.Reader.WaitToReadAsync(token))
{
while (_receivedMessages.In.TryRead(out var message))
while (_receivedMessages.Reader.TryRead(out var message))
{
ReceiveCallback[] callbackCopies;
lock (_callbacks)

View File

@ -18,7 +18,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
[InlineData("12:Hello, World;", "Hello, World")]
public void ReadTextMessage(string encoded, string payload)
{
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
Assert.True(LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message));
Assert.Equal(0, buffer.Length);
@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
public void ReadMultipleMessages()
{
const string encoded = "0:;14:Hello,\r\nWorld!;";
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
var messages = new List<byte[]>();
while (LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message))
@ -54,7 +54,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
[InlineData("5:ABCDE")]
public void ReadIncompleteMessages(string encoded)
{
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
Assert.False(LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _));
}
@ -66,7 +66,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
[InlineData("5:ABCDEF", "Missing delimiter ';' after payload")]
public void ReadInvalidMessages(string encoded, string expectedMessage)
{
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
var ex = Assert.Throws<FormatException>(() =>
{
LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _);
@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
{
// Invalid because first character is a UTF-8 "continuation" character
// We need to include the ':' so that
ReadOnlyBuffer<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
ReadOnlyMemory<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
var ex = Assert.Throws<FormatException>(() =>
{
LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _);

View File

@ -109,7 +109,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
using (var ms = new MemoryStream())
{
BinaryMessageFormatter.WriteMessage(payload, ms);
var buffer = new ReadOnlyBuffer<byte>(ms.ToArray());
var buffer = new ReadOnlyMemory<byte>(ms.ToArray());
Assert.True(BinaryMessageParser.TryParseMessage(ref buffer, out var roundtripped));
Assert.Equal(payload, roundtripped.ToArray());
}

View File

@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0x0B, 0x41, 0x0A, 0x52, 0x0D, 0x43, 0x0D, 0x0A, 0x3B, 0x44, 0x45, 0x46 }, "A\nR\rC\r\n;DEF")]
public void ReadMessage(byte[] encoded, string payload)
{
ReadOnlyBuffer<byte> span = encoded;
ReadOnlyMemory<byte> span = encoded;
Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
Assert.Equal(0, span.Length);
@ -52,7 +52,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
})]
public void ReadBinaryMessage(byte[] encoded, byte[] payload)
{
ReadOnlyBuffer<byte> span = encoded;
ReadOnlyMemory<byte> span = encoded;
Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
Assert.Equal(0, span.Length);
Assert.Equal(payload, message.ToArray());
@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF })]
public void BinaryMessageParserThrowsForMessagesOver2GB(byte[] payload)
{
var buffer = new ReadOnlyBuffer<byte>(payload);
var buffer = new ReadOnlyMemory<byte>(payload);
var ex = Assert.Throws<FormatException>(() => BinaryMessageParser.TryParseMessage(ref buffer, out var message));
Assert.Equal("Messages over 2GB in size are not supported.", ex.Message);
}
@ -76,7 +76,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0x80 })] // size is cut
public void BinaryMessageParserReturnsFalseForPartialPayloads(byte[] payload)
{
var buffer = new ReadOnlyBuffer<byte>(payload);
var buffer = new ReadOnlyMemory<byte>(payload);
Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
}
@ -90,7 +90,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
/* length: */ 0x0E,
/* body: */ 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x2C, 0x0D, 0x0A, 0x57, 0x6F, 0x72, 0x6C, 0x64, 0x21,
};
ReadOnlyBuffer<byte> buffer = encoded;
ReadOnlyMemory<byte> buffer = encoded;
var messages = new List<byte[]>();
while (BinaryMessageParser.TryParseMessage(ref buffer, out var message))
@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0x09, 0x00, 0x00 })] // Not enough data for payload
public void ReadIncompleteMessages(byte[] encoded)
{
ReadOnlyBuffer<byte> buffer = encoded;
ReadOnlyMemory<byte> buffer = encoded;
Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
Assert.Equal(encoded.Length, buffer.Length);
}

View File

@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[Fact]
public void ReadMessage()
{
var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001e"));
var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001e"));
Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
@ -23,14 +23,14 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[Fact]
public void TryReadingIncompleteMessage()
{
var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC"));
var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC"));
Assert.False(TextMessageParser.TryParseMessage(ref message, out var payload));
}
[Fact]
public void TryReadingMultipleMessages()
{
var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e"));
var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e"));
Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
Assert.True(TextMessageParser.TryParseMessage(ref message, out payload));
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
[Fact]
public void IncompleteTrailingMessage()
{
var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123"));
var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123"));
Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
Assert.True(TextMessageParser.TryParseMessage(ref message, out payload));

View File

@ -4,7 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests;
using Microsoft.AspNetCore.SignalR.Tests.Common;
@ -70,7 +70,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
AssertMessage(output1);
Assert.False(output2.In.TryRead(out var item));
Assert.False(output2.Reader.TryRead(out var item));
}
}
@ -100,7 +100,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
AssertMessage(output1);
Assert.False(output2.In.TryRead(out var item));
Assert.False(output2.Reader.TryRead(out var item));
}
}
@ -201,7 +201,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
AssertMessage(output1);
Assert.False(output2.In.TryRead(out var item));
Assert.False(output2.Reader.TryRead(out var item));
}
}
@ -286,7 +286,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
Assert.False(output.In.TryRead(out var item));
Assert.False(output.Reader.TryRead(out var item));
}
}
@ -387,7 +387,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output);
Assert.False(output.In.TryRead(out var item));
Assert.False(output.Reader.TryRead(out var item));
}
}
@ -417,7 +417,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output);
Assert.False(output.In.TryRead(out var item));
Assert.False(output.Reader.TryRead(out var item));
}
}
@ -451,7 +451,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
Assert.False(output.In.TryRead(out var item));
Assert.False(output.Reader.TryRead(out var item));
}
}
@ -480,7 +480,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
await manager1.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
AssertMessage(output);
Assert.False(output.In.TryRead(out var item));
Assert.False(output.Reader.TryRead(out var item));
}
}
@ -499,10 +499,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
using (var client = new TestClient())
{
// Force an exception when writing to connection
var output = new Mock<Channel<HubMessage>>();
output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
var writer = new Mock<ChannelWriter<HubMessage>>();
writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
var connection = new HubConnectionContext(output.Object, client.Connection);
var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
await manager2.OnConnectedAsync(connection).OrTimeout();
@ -523,10 +523,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
using (var client = new TestClient())
{
// Force an exception when writing to connection
var output = new Mock<Channel<HubMessage>>();
output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
var writer = new Mock<ChannelWriter<HubMessage>>();
writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
var connection = new HubConnectionContext(output.Object, client.Connection);
var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
await manager.OnConnectedAsync(connection).OrTimeout();
@ -549,10 +549,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
var output2 = Channel.CreateUnbounded<HubMessage>();
// Force an exception when writing to connection
var output = new Mock<Channel<HubMessage>>();
output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
var writer = new Mock<ChannelWriter<HubMessage>>();
writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
var connection1 = new HubConnectionContext(output.Object, client1.Connection);
var connection1 = new HubConnectionContext(new MockChannel(writer.Object), client1.Connection);
var connection2 = new HubConnectionContext(output2, client2.Connection);
await manager.OnConnectedAsync(connection1).OrTimeout();
@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
private void AssertMessage(Channel<HubMessage> channel)
{
Assert.True(channel.In.TryRead(out var item));
Assert.True(channel.Reader.TryRead(out var item));
var message = Assert.IsType<InvocationMessage>(item);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
@ -583,5 +583,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
private class MyHub : Hub
{
}
private class MockChannel : Channel<HubMessage>
{
public MockChannel(ChannelWriter<HubMessage> writer = null)
{
Writer = writer;
}
}
}
}

View File

@ -1,7 +1,7 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Moq;
@ -29,13 +29,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
Assert.True(output1.In.TryRead(out var item));
Assert.True(output1.Reader.TryRead(out var item));
var message = Assert.IsType<InvocationMessage>(item);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
Assert.True(output2.In.TryRead(out item));
Assert.True(output2.Reader.TryRead(out item));
message = Assert.IsType<InvocationMessage>(item);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
@ -63,13 +63,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
Assert.True(output1.In.TryRead(out var item));
Assert.True(output1.Reader.TryRead(out var item));
var message = Assert.IsType<InvocationMessage>(item);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
Assert.False(output2.In.TryRead(out item));
Assert.False(output2.Reader.TryRead(out item));
}
}
@ -93,13 +93,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await manager.InvokeGroupAsync("gunit", "Hello", new object[] { "World" }).OrTimeout();
Assert.True(output1.In.TryRead(out var item));
Assert.True(output1.Reader.TryRead(out var item));
var message = Assert.IsType<InvocationMessage>(item);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
Assert.False(output2.In.TryRead(out item));
Assert.False(output2.Reader.TryRead(out item));
}
}
@ -116,7 +116,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await manager.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
Assert.True(output.In.TryRead(out var item));
Assert.True(output.Reader.TryRead(out var item));
var message = Assert.IsType<InvocationMessage>(item);
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
@ -130,11 +130,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
using (var client = new TestClient())
{
// Force an exception when writing to connection
var output = new Mock<Channel<HubMessage>>();
output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
var writer = new Mock<ChannelWriter<HubMessage>>();
writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
var manager = new DefaultHubLifetimeManager<MyHub>();
var connection = new HubConnectionContext(output.Object, client.Connection);
var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
await manager.OnConnectedAsync(connection).OrTimeout();
@ -168,5 +168,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
}
private class MockChannel: Channel<HubMessage>
{
public MockChannel(ChannelWriter<HubMessage> writer = null)
{
Writer = writer;
}
}
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.Sockets;
namespace Microsoft.AspNetCore.SignalR.Tests
@ -10,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
public async override Task OnConnectedAsync(ConnectionContext connection)
{
await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
}
}
}

View File

@ -8,7 +8,7 @@ using System.Runtime.Serialization;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR.Internal;
@ -259,7 +259,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
using (var client = new TestClient())
{
// TestClient automatically writes negotiate, for this test we want to assume negotiate never gets sent
client.Connection.Transport.In.TryRead(out var item);
client.Connection.Transport.Reader.TryRead(out var item);
var endPointTask = endPoint.OnConnectedAsync(client.Connection);
@ -285,7 +285,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
using (var client = new TestClient())
{
// TestClient automatically writes negotiate, for this test we want to assume negotiate never gets sent
client.Connection.Transport.In.TryRead(out var item);
client.Connection.Transport.Reader.TryRead(out var item);
await endPoint.OnConnectedAsync(client.Connection).OrTimeout();
}
@ -521,7 +521,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await client.SendInvocationAsync(methodName, nonBlocking: true).OrTimeout();
// Nothing should have been written
Assert.False(client.Application.In.TryRead(out var buffer));
Assert.False(client.Application.Reader.TryRead(out var buffer));
// kill the connection
client.Dispose();
@ -1595,7 +1595,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
return new CountingObservable(count);
}
public ReadableChannel<string> CounterChannel(int count)
public ChannelReader<string> CounterChannel(int count)
{
var channel = Channel.CreateUnbounded<string>();
@ -1603,17 +1603,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
for (int i = 0; i < count; i++)
{
await channel.Out.WriteAsync(i.ToString());
await channel.Writer.WriteAsync(i.ToString());
}
channel.Out.Complete();
channel.Writer.Complete();
});
return channel.In;
return channel.Reader;
}
public ReadableChannel<string> BlockingStream()
public ChannelReader<string> BlockingStream()
{
return Channel.CreateUnbounded<string>().In;
return Channel.CreateUnbounded<string>().Reader;
}
private class CountingObservable : IObservable<string>

View File

@ -1,9 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.Sockets;
@ -20,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Protocol.Tests
[MemberData(nameof(HubProtocols))]
public void DefaultHubProtocolResolverTestsCanCreateSupportedProtocols(IHubProtocol protocol)
{
var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Out, new Mock<ConnectionContext>().Object);
var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Writer, new Mock<ConnectionContext>().Object);
Assert.IsType(
protocol.GetType(),
new DefaultHubProtocolResolver(Options.Create(new HubOptions())).GetProtocol(protocol.Name, mockConnection.Object));
@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Protocol.Tests
[InlineData("dummy")]
public void DefaultHubProtocolResolverThrowsForNotSupportedProtocol(string protocolName)
{
var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Out, new Mock<ConnectionContext>().Object);
var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Writer, new Mock<ConnectionContext>().Object);
var exception = Assert.Throws<NotSupportedException>(
() => new DefaultHubProtocolResolver(Options.Create(new HubOptions())).GetProtocol(protocolName, mockConnection.Object));

View File

@ -1,9 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Client;
@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
var webSocketsTransport = new WebSocketsTransport(loggerFactory);
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
TransferMode.Binary, connectionId: string.Empty);
connectionToTransport.Out.TryComplete();
connectionToTransport.Writer.TryComplete();
await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10));
}
}
@ -82,7 +82,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, transferMode, connectionId: string.Empty);
var sendTcs = new TaskCompletionSource<object>();
connectionToTransport.Out.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
try
{
await sendTcs.Task;
@ -99,7 +99,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
// The echo endpoint closes the connection immediately after sending response which should stop the transport
await webSocketsTransport.Running.OrTimeout();
Assert.True(transportToConnection.In.TryRead(out var buffer));
Assert.True(transportToConnection.Reader.TryRead(out var buffer));
Assert.Equal(new byte[] { 0x42 }, buffer);
}
}

View File

@ -82,12 +82,12 @@ namespace Microsoft.AspNetCore.Sockets.Tests
connection.ApplicationTask = Task.Run(async () =>
{
Assert.False(await connection.Transport.In.WaitToReadAsync());
Assert.False(await connection.Transport.Reader.WaitToReadAsync());
});
connection.TransportTask = Task.Run(async () =>
{
Assert.False(await connection.Application.In.WaitToReadAsync());
Assert.False(await connection.Application.Reader.WaitToReadAsync());
});
connectionManager.CloseConnections();
@ -197,7 +197,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
appLifetime.StopApplication();
// Connection should be disposed so this should complete immediately
Assert.False(await connection.Application.Out.WaitToWriteAsync().OrTimeout());
Assert.False(await connection.Application.Writer.WaitToWriteAsync().OrTimeout());
}
private static ConnectionManager CreateConnectionManager(IApplicationLifetime lifetime = null)

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -511,7 +511,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the transport so the poll yields
await connection.Transport.Out.WriteAsync(buffer);
await connection.Transport.Writer.WriteAsync(buffer);
await task;
@ -543,7 +543,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the application
await connection.Application.Out.WriteAsync(buffer);
await connection.Application.Writer.WriteAsync(buffer);
await task;
@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var buffer = Encoding.UTF8.GetBytes("Hello World");
// Write to the application
await connection.Application.Out.WriteAsync(buffer);
await connection.Application.Writer.WriteAsync(buffer);
await task;
@ -606,7 +606,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
await task1.OrTimeout();
// Send a message from the app to complete Task 2
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
await task2.OrTimeout();
@ -775,7 +775,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
var endPointTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await endPointTask.OrTimeout();
@ -853,7 +853,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
}));
var endPointTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await endPointTask.OrTimeout();
@ -907,7 +907,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
var endPointTask = dispatcher.ExecuteAsync(context, options, app);
await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
await endPointTask.OrTimeout();
@ -1110,7 +1110,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
public override Task OnConnectedAsync(ConnectionContext connection)
{
connection.Transport.In.WaitToReadAsync().Wait();
connection.Transport.Reader.WaitToReadAsync().Wait();
return Task.CompletedTask;
}
}
@ -1135,7 +1135,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
while (await connection.Transport.In.WaitToReadAsync())
while (await connection.Transport.Reader.WaitToReadAsync())
{
}
}

View File

@ -1,11 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Internal.Transports;
@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var context = new DefaultHttpContext();
var poll = new LongPollingTransport(CancellationToken.None, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
Assert.True(channel.Out.TryComplete());
Assert.True(channel.Writer.TryComplete());
await poll.ProcessRequestAsync(context, context.RequestAborted);
@ -56,9 +56,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var ms = new MemoryStream();
context.Response.Body = ms;
await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
Assert.True(channel.Out.TryComplete());
Assert.True(channel.Writer.TryComplete());
await poll.ProcessRequestAsync(context, context.RequestAborted);
@ -76,11 +76,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var ms = new MemoryStream();
context.Response.Body = ms;
await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
await channel.Out.WriteAsync(Encoding.UTF8.GetBytes(" "));
await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("World"));
await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes(" "));
await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("World"));
Assert.True(channel.Out.TryComplete());
Assert.True(channel.Writer.TryComplete());
await poll.ProcessRequestAsync(context, context.RequestAborted);

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -13,12 +13,21 @@ using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Testing.xunit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.Sockets.Tests
{
public class MapEndPointTests
{
private ITestOutputHelper _output;
public MapEndPointTests(ITestOutputHelper output)
{
_output = output;
}
[Fact]
public void MapEndPointFindsAuthAttributeOnEndPoint()
{
@ -40,6 +49,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
});
});
})
.ConfigureLogging(factory =>
{
factory.AddXunit(_output, LogLevel.Trace);
})
.Build();
Assert.Equal(1, authCount);
@ -66,6 +79,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
});
});
})
.ConfigureLogging(factory =>
{
factory.AddXunit(_output, LogLevel.Trace);
})
.Build();
Assert.Equal(1, authCount);
@ -92,6 +109,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
});
});
})
.ConfigureLogging(factory =>
{
factory.AddXunit(_output, LogLevel.Trace);
})
.Build();
Assert.Equal(2, authCount);
@ -102,24 +123,28 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public async Task MapEndPointWithWebSocketSubProtocolSetsProtocol()
{
var host = new WebHostBuilder()
.UseUrls("http://127.0.0.1:0")
.UseKestrel()
.ConfigureServices(services =>
.UseUrls("http://127.0.0.1:0")
.UseKestrel()
.ConfigureServices(services =>
{
services.AddSockets();
services.AddEndPoint<MyEndPoint>();
})
.Configure(app =>
{
app.UseSockets(routes =>
{
services.AddSockets();
services.AddEndPoint<MyEndPoint>();
})
.Configure(app =>
{
app.UseSockets(routes =>
routes.MapEndPoint<MyEndPoint>("socket", httpSocketOptions =>
{
routes.MapEndPoint<MyEndPoint>("socket", httpSocketOptions =>
{
httpSocketOptions.WebSockets.SubProtocol = "protocol1";
});
httpSocketOptions.WebSockets.SubProtocol = "protocol1";
});
})
.Build();
});
})
.ConfigureLogging(factory =>
{
factory.AddXunit(_output, LogLevel.Trace);
})
.Build();
await host.StartAsync();
@ -140,7 +165,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
while (!await connection.Transport.In.WaitToReadAsync())
while (!await connection.Transport.Reader.WaitToReadAsync())
{
}

View File

@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFrameworks>netcoreapp2.0;net461</TargetFrameworks>
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netcoreapp2.0</TargetFrameworks>
<RuntimeIdentifier Condition="'$(TargetFramework)' != 'netcoreapp2.0'">win7-x64</RuntimeIdentifier>
</PropertyGroup>
@ -21,6 +21,7 @@
<PackageReference Include="Microsoft.AspNetCore.Http" Version="$(MicrosoftAspNetCoreHttpPackageVersion)" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="$(MicrosoftAspNetCoreServerKestrelPackageVersion)" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Testing" Version="$(MicrosoftExtensionsLoggingTestingPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.SignalR.Tests.Common;
@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var context = new DefaultHttpContext();
var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
Assert.True(channel.Out.TryComplete());
Assert.True(channel.Writer.TryComplete());
await sse.ProcessRequestAsync(context, context.RequestAborted);
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
context.Features.Set<IHttpBufferingFeature>(feature);
var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
Assert.True(channel.Out.TryComplete());
Assert.True(channel.Writer.TryComplete());
await sse.ProcessRequestAsync(context, context.RequestAborted);
@ -50,7 +50,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[Fact]
public async Task SSEWritesMessages()
{
var channel = Channel.CreateUnbounded<byte[]>(new ChannelOptimizations
var channel = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = true
});
@ -62,11 +62,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var task = sse.ProcessRequestAsync(context, context.RequestAborted);
await channel.Out.WriteAsync(Encoding.ASCII.GetBytes("Hello"));
await channel.Writer.WriteAsync(Encoding.ASCII.GetBytes("Hello"));
Assert.Equal(":\r\ndata: Hello\r\n\r\n", Encoding.ASCII.GetString(ms.ToArray()));
channel.Out.TryComplete();
channel.Writer.TryComplete();
await task.OrTimeout();
}
@ -83,9 +83,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var ms = new MemoryStream();
context.Response.Body = ms;
await channel.Out.WriteAsync(Encoding.UTF8.GetBytes(message));
await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes(message));
Assert.True(channel.Out.TryComplete());
Assert.True(channel.Writer.TryComplete());
await sse.ProcessRequestAsync(context, context.RequestAborted);

View File

@ -1,9 +1,9 @@
using System;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
@ -22,8 +22,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
var clientToServer = Channel.CreateUnbounded<WebSocketMessage>();
var serverToClient = Channel.CreateUnbounded<WebSocketMessage>();
var clientSocket = new WebSocketChannel(serverToClient.In, clientToServer.Out);
var serverSocket = new WebSocketChannel(clientToServer.In, serverToClient.Out);
var clientSocket = new WebSocketChannel(serverToClient.Reader, clientToServer.Writer);
var serverSocket = new WebSocketChannel(clientToServer.Reader, serverToClient.Writer);
Client = clientSocket;
return Task.FromResult<WebSocket>(serverSocket);
@ -35,14 +35,14 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public class WebSocketChannel : WebSocket
{
private readonly ReadableChannel<WebSocketMessage> _input;
private readonly WritableChannel<WebSocketMessage> _output;
private readonly ChannelReader<WebSocketMessage> _input;
private readonly ChannelWriter<WebSocketMessage> _output;
private WebSocketCloseStatus? _closeStatus;
private string _closeStatusDescription;
private WebSocketState _state;
public WebSocketChannel(ReadableChannel<WebSocketMessage> input, WritableChannel<WebSocketMessage> output)
public WebSocketChannel(ChannelReader<WebSocketMessage> input, ChannelWriter<WebSocketMessage> output)
{
_input = input;
_output = output;
@ -209,4 +209,4 @@ namespace Microsoft.AspNetCore.Sockets.Tests
public string CloseStatusDescription { get; set; }
}
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
@ -6,58 +6,68 @@ using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using System.Threading.Channels;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Tests.Common;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.AspNetCore.Sockets.Internal.Transports;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Testing;
using Xunit;
using Xunit.Abstractions;
namespace Microsoft.AspNetCore.Sockets.Tests
{
public class WebSocketsTests
public class WebSocketsTests : LoggedTest
{
public WebSocketsTests(ITestOutputHelper output)
: base(output)
{
}
[Theory]
[InlineData(WebSocketMessageType.Text)]
[InlineData(WebSocketMessageType.Binary)]
public async Task ReceivedFramesAreWrittenToChannel(WebSocketMessageType webSocketMessageType)
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
// Send a frame, then close
await feature.Client.SendAsync(
buffer: new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello")),
messageType: webSocketMessageType,
endOfMessage: true,
cancellationToken: CancellationToken.None);
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
var buffer = await applicationSide.In.ReadAsync();
Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
// Send a frame, then close
await feature.Client.SendAsync(
buffer: new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello")),
messageType: webSocketMessageType,
endOfMessage: true,
cancellationToken: CancellationToken.None);
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
Assert.True(applicationSide.Out.TryComplete());
var buffer = await applicationSide.Reader.ReadAsync();
Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
// The transport should finish now
await transport;
Assert.True(applicationSide.Writer.TryComplete());
// The connection should close after this, which means the client will get a close frame.
var clientSummary = await client;
// The transport should finish now
await transport;
Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.CloseStatus);
// The connection should close after this, which means the client will get a close frame.
var clientSummary = await client;
Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.CloseStatus);
}
}
}
@ -66,256 +76,276 @@ namespace Microsoft.AspNetCore.Sockets.Tests
[InlineData(TransferMode.Binary, WebSocketMessageType.Binary)]
public async Task WebSocketTransportSetsMessageTypeBasedOnTransferModeFeature(TransferMode transferMode, WebSocketMessageType expectedMessageType)
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var connectionContext = new DefaultConnectionContext(string.Empty, null, null) { TransferMode = transferMode };
var ws = new WebSocketsTransport(new WebSocketOptions(),
transportSide, connectionContext, loggerFactory: new LoggerFactory());
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var connectionContext = new DefaultConnectionContext(string.Empty, null, null) { TransferMode = transferMode };
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
// Write to the output channel, and then complete it
await applicationSide.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
Assert.True(applicationSide.Out.TryComplete());
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// The client should finish now, as should the server
var clientSummary = await client;
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
await transport;
// Write to the output channel, and then complete it
await applicationSide.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
Assert.True(applicationSide.Writer.TryComplete());
Assert.Equal(1, clientSummary.Received.Count);
Assert.True(clientSummary.Received[0].EndOfMessage);
Assert.Equal(expectedMessageType, clientSummary.Received[0].MessageType);
Assert.Equal("Hello", Encoding.UTF8.GetString(clientSummary.Received[0].Buffer));
// The client should finish now, as should the server
var clientSummary = await client;
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
await transport;
Assert.Equal(1, clientSummary.Received.Count);
Assert.True(clientSummary.Received[0].EndOfMessage);
Assert.Equal(expectedMessageType, clientSummary.Received[0].MessageType);
Assert.Equal("Hello", Encoding.UTF8.GetString(clientSummary.Received[0].Buffer));
}
}
}
[Fact]
public async Task TransportFailsWhenClientDisconnectsAbnormally()
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
async Task CompleteApplicationAfterTransportCompletes()
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
// Wait until the transport completes so that we can end the application
await applicationSide.In.WaitToReadAsync();
async Task CompleteApplicationAfterTransportCompletes()
{
// Wait until the transport completes so that we can end the application
await applicationSide.Reader.WaitToReadAsync();
// Complete the application so that the connection unwinds without aborting
applicationSide.Out.TryComplete();
// Complete the application so that the connection unwinds without aborting
applicationSide.Writer.TryComplete();
}
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// When the close frame is received, we complete the application so the send
// loop unwinds
_ = CompleteApplicationAfterTransportCompletes();
// Terminate the client to server channel with an exception
feature.Client.SendAbort();
// Wait for the transport
await Assert.ThrowsAsync<WebSocketException>(() => transport).OrTimeout();
var summary = await client.OrTimeout();
Assert.Equal(WebSocketCloseStatus.InternalServerError, summary.CloseResult.CloseStatus);
}
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// When the close frame is received, we complete the application so the send
// loop unwinds
_ = CompleteApplicationAfterTransportCompletes();
// Terminate the client to server channel with an exception
feature.Client.SendAbort();
// Wait for the transport
await Assert.ThrowsAsync<WebSocketException>(() => transport).OrTimeout();
var summary = await client.OrTimeout();
Assert.Equal(WebSocketCloseStatus.InternalServerError, summary.CloseResult.CloseStatus);
}
}
[Fact]
public async Task ClientReceivesInternalServerErrorWhenTheApplicationFails()
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
// Fail in the app
Assert.True(applicationSide.Out.TryComplete(new InvalidOperationException("Catastrophic failure.")));
var clientSummary = await client.OrTimeout();
Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Close from the client
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
// Fail in the app
Assert.True(applicationSide.Writer.TryComplete(new InvalidOperationException("Catastrophic failure.")));
var clientSummary = await client.OrTimeout();
Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => transport.OrTimeout());
Assert.Equal("Catastrophic failure.", ex.Message);
// Close from the client
await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => transport.OrTimeout());
Assert.Equal("Catastrophic failure.", ex.Message);
}
}
}
[Fact]
public async Task TransportClosesOnCloseTimeoutIfClientDoesNotSendCloseFrame()
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var options = new WebSocketOptions()
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
CloseTimeout = TimeSpan.FromSeconds(1)
};
var options = new WebSocketOptions()
{
CloseTimeout = TimeSpan.FromSeconds(1)
};
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
// End the app
applicationSide.Dispose();
// End the app
applicationSide.Dispose();
await transport.OrTimeout(TimeSpan.FromSeconds(10));
await transport.OrTimeout(TimeSpan.FromSeconds(10));
// Now we're closed
Assert.Equal(WebSocketState.Aborted, serverSocket.State);
// Now we're closed
Assert.Equal(WebSocketState.Aborted, serverSocket.State);
serverSocket.Dispose();
serverSocket.Dispose();
}
}
}
[Fact]
public async Task TransportFailsOnTimeoutWithErrorWhenApplicationFailsAndClientDoesNotSendCloseFrame()
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var options = new WebSocketOptions
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
CloseTimeout = TimeSpan.FromSeconds(1)
};
var options = new WebSocketOptions
{
CloseTimeout = TimeSpan.FromSeconds(1)
};
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// fail the client to server channel
applicationToTransport.Out.TryComplete(new Exception());
// fail the client to server channel
applicationToTransport.Writer.TryComplete(new Exception());
await Assert.ThrowsAsync<Exception>(() => transport).OrTimeout();
await Assert.ThrowsAsync<Exception>(() => transport).OrTimeout();
Assert.Equal(WebSocketState.Aborted, serverSocket.State);
Assert.Equal(WebSocketState.Aborted, serverSocket.State);
}
}
}
[Fact]
public async Task ServerGracefullyClosesWhenApplicationEndsThenClientSendsCloseFrame()
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var options = new WebSocketOptions
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
// We want to verify behavior without timeout affecting it
CloseTimeout = TimeSpan.FromSeconds(20)
};
var options = new WebSocketOptions
{
// We want to verify behavior without timeout affecting it
CloseTimeout = TimeSpan.FromSeconds(20)
};
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// close the client to server channel
applicationToTransport.Out.TryComplete();
// close the client to server channel
applicationToTransport.Writer.TryComplete();
_ = await client.OrTimeout();
_ = await client.OrTimeout();
await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
await transport.OrTimeout();
await transport.OrTimeout();
Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
}
}
}
[Fact]
public async Task ServerGracefullyClosesWhenClientSendsCloseFrameThenApplicationEnds()
{
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
using (StartLog(out var loggerFactory))
{
var options = new WebSocketOptions
var transportToApplication = Channel.CreateUnbounded<byte[]>();
var applicationToTransport = Channel.CreateUnbounded<byte[]>();
using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
using (var feature = new TestWebSocketConnectionFeature())
{
// We want to verify behavior without timeout affecting it
CloseTimeout = TimeSpan.FromSeconds(20)
};
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
var options = new WebSocketOptions
{
// We want to verify behavior without timeout affecting it
CloseTimeout = TimeSpan.FromSeconds(20)
};
var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
var serverSocket = await feature.AcceptAsync();
// Give the server socket to the transport and run it
var transport = ws.ProcessSocketAsync(serverSocket);
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
// Run the client socket
var client = feature.Client.ExecuteAndCaptureFramesAsync();
await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
// close the client to server channel
applicationToTransport.Out.TryComplete();
// close the client to server channel
applicationToTransport.Writer.TryComplete();
_ = await client.OrTimeout();
_ = await client.OrTimeout();
await transport.OrTimeout();
await transport.OrTimeout();
Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
}
}
}
}