Tackling some low hanging performance fruit (#1470)
* Tackling some low hanging performance fruit - Use native Memory/Span APIs on Stream and WebSocket in .NET Core 2.1 - Remove double copying in formatters - Implemented custom HttpContent over ReadOnlyBuffer<byte>
This commit is contained in:
parent
bfa2df1fc6
commit
c8d4cf689f
|
|
@ -23,14 +23,16 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
|
|||
var buffer = new byte[MessageLength];
|
||||
Random.NextBytes(buffer);
|
||||
var output = new MemoryStream();
|
||||
BinaryMessageFormatter.WriteMessage(buffer, output);
|
||||
BinaryMessageFormatter.WriteLengthPrefix(buffer.Length, output);
|
||||
output.Write(buffer, 0, buffer.Length);
|
||||
|
||||
_binaryInput = output.ToArray();
|
||||
|
||||
buffer = new byte[MessageLength];
|
||||
Random.NextBytes(buffer);
|
||||
output = new MemoryStream();
|
||||
TextMessageFormatter.WriteMessage(buffer, output);
|
||||
output.Write(buffer, 0, buffer.Length);
|
||||
TextMessageFormatter.WriteRecordSeparator(output);
|
||||
|
||||
_textInput = output.ToArray();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,10 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets.Http.Internal
|
||||
namespace System.IO.Pipelines
|
||||
{
|
||||
// Write only stream implementation for efficiently writing bytes from the request body
|
||||
internal class PipeWriterStream : Stream
|
||||
|
|
@ -62,5 +61,14 @@ namespace Microsoft.AspNetCore.Sockets.Http.Internal
|
|||
Write(buffer, offset, count);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
#if NETCOREAPP2_1
|
||||
public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_pipeWriter.Write(source.Span);
|
||||
_length += source.Length;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace System.IO
|
||||
{
|
||||
internal static class StreamExtensions
|
||||
{
|
||||
public static async Task WriteAsync(this Stream stream, ReadOnlyBuffer<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// REVIEW: Should we special case IsSingleSegment here?
|
||||
foreach (var segment in buffer)
|
||||
{
|
||||
#if NETCOREAPP2_1
|
||||
await stream.WriteAsync(segment, cancellationToken);
|
||||
#else
|
||||
var isArray = MemoryMarshal.TryGetArray(segment, out var arraySegment);
|
||||
// We're using the managed memory pool which is backed by managed buffers
|
||||
Debug.Assert(isArray);
|
||||
await stream.WriteAsync(arraySegment.Array, arraySegment.Offset, arraySegment.Count, cancellationToken);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace System.Net.WebSockets
|
||||
{
|
||||
internal static class WebSocketExtensions
|
||||
{
|
||||
public static Task SendAsync(this WebSocket webSocket, ReadOnlyBuffer<byte> buffer, WebSocketMessageType webSocketMessageType, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// TODO: Consider chunking writes here if we get a multi segment buffer
|
||||
#if NETCOREAPP2_1
|
||||
if (buffer.IsSingleSegment)
|
||||
{
|
||||
return webSocket.SendAsync(buffer.First, webSocketMessageType, endOfMessage: true, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
return webSocket.SendAsync(buffer.ToArray(), webSocketMessageType, endOfMessage: true, cancellationToken);
|
||||
}
|
||||
#else
|
||||
if (buffer.IsSingleSegment)
|
||||
{
|
||||
var isArray = MemoryMarshal.TryGetArray(buffer.First, out var segment);
|
||||
Debug.Assert(isArray);
|
||||
return webSocket.SendAsync(segment, webSocketMessageType, endOfMessage: true, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
return webSocket.SendAsync(new ArraySegment<byte>(buffer.ToArray()), webSocketMessageType, true, cancellationToken);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2,21 +2,23 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.IO;
|
||||
|
||||
namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
|
||||
{
|
||||
public static class BinaryMessageFormatter
|
||||
{
|
||||
public unsafe static void WriteMessage(ReadOnlySpan<byte> payload, Stream output)
|
||||
public static void WriteLengthPrefix(long length, Stream output)
|
||||
{
|
||||
// This code writes length prefix of the message as a VarInt. Read the comment in
|
||||
// the BinaryMessageParser.TryParseMessage for details.
|
||||
|
||||
var lenBuffer = stackalloc byte[5];
|
||||
#if NETCOREAPP2_1
|
||||
Span<byte> lenBuffer = stackalloc byte[5];
|
||||
#else
|
||||
var lenBuffer = new byte[5];
|
||||
#endif
|
||||
var lenNumBytes = 0;
|
||||
var length = payload.Length;
|
||||
do
|
||||
{
|
||||
ref var current = ref lenBuffer[lenNumBytes];
|
||||
|
|
@ -30,14 +32,11 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
|
|||
}
|
||||
while (length > 0);
|
||||
|
||||
var buffer = ArrayPool<byte>.Shared.Rent(lenNumBytes + payload.Length);
|
||||
var bufferSpan = buffer.AsSpan();
|
||||
|
||||
new ReadOnlySpan<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
|
||||
bufferSpan = bufferSpan.Slice(lenNumBytes);
|
||||
payload.CopyTo(bufferSpan);
|
||||
output.Write(buffer, 0, lenNumBytes + payload.Length);
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
#if NETCOREAPP2_1
|
||||
output.Write(lenBuffer.Slice(0, lenNumBytes));
|
||||
#else
|
||||
output.Write(lenBuffer, 0, lenNumBytes);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.IO;
|
||||
|
||||
namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
|
||||
|
|
@ -13,11 +11,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
|
|||
// will not occur (is not a valid character) and therefore it is safe to not escape it
|
||||
internal static readonly byte RecordSeparator = 0x1e;
|
||||
|
||||
public static void WriteMessage(ReadOnlySpan<byte> payload, Stream output)
|
||||
public static void WriteRecordSeparator(Stream output)
|
||||
{
|
||||
var buffer = ArrayPool<byte>.Shared.Rent(payload.Length);
|
||||
payload.CopyTo(buffer);
|
||||
output.Write(buffer, 0, payload.Length);
|
||||
output.WriteByte(RecordSeparator);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Runtime.ExceptionServices;
|
||||
using System.Text;
|
||||
using Microsoft.AspNetCore.SignalR.Internal.Formatters;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Newtonsoft.Json;
|
||||
|
|
@ -15,6 +16,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
|
|||
{
|
||||
public class JsonHubProtocol : IHubProtocol
|
||||
{
|
||||
private static readonly UTF8Encoding _utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
|
||||
|
||||
private const string ResultPropertyName = "result";
|
||||
private const string ItemPropertyName = "item";
|
||||
private const string InvocationIdPropertyName = "invocationId";
|
||||
|
|
@ -59,13 +62,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
|
|||
|
||||
public void WriteMessage(HubMessage message, Stream output)
|
||||
{
|
||||
using (var memoryStream = new MemoryStream())
|
||||
{
|
||||
WriteMessageCore(message, memoryStream);
|
||||
memoryStream.Flush();
|
||||
|
||||
TextMessageFormatter.WriteMessage(memoryStream.ToArray(), output);
|
||||
}
|
||||
WriteMessageCore(message, output);
|
||||
TextMessageFormatter.WriteRecordSeparator(output);
|
||||
}
|
||||
|
||||
private HubMessage ParseMessage(Stream input, IInvocationBinder binder)
|
||||
|
|
@ -135,7 +133,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
|
|||
|
||||
private void WriteMessageCore(HubMessage message, Stream stream)
|
||||
{
|
||||
using (var writer = new JsonTextWriter(new StreamWriter(stream)))
|
||||
using (var writer = new JsonTextWriter(new StreamWriter(stream, _utf8NoBom, 1024, leaveOpen: true)))
|
||||
{
|
||||
writer.WriteStartObject();
|
||||
switch (message)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Buffers;
|
||||
using System.Collections;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using Microsoft.AspNetCore.SignalR.Internal.Formatters;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
|
|
@ -13,22 +14,21 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
|
|||
{
|
||||
public static class NegotiationProtocol
|
||||
{
|
||||
private static readonly UTF8Encoding _utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
|
||||
|
||||
private const string ProtocolPropertyName = "protocol";
|
||||
|
||||
public static void WriteMessage(NegotiationMessage negotiationMessage, Stream output)
|
||||
{
|
||||
using (var memoryStream = new MemoryStream())
|
||||
using (var writer = new JsonTextWriter(new StreamWriter(output, _utf8NoBom, 1024, leaveOpen: true)))
|
||||
{
|
||||
using (var writer = new JsonTextWriter(new StreamWriter(memoryStream)))
|
||||
{
|
||||
writer.WriteStartObject();
|
||||
writer.WritePropertyName(ProtocolPropertyName);
|
||||
writer.WriteValue(negotiationMessage.Protocol);
|
||||
writer.WriteEndObject();
|
||||
}
|
||||
|
||||
TextMessageFormatter.WriteMessage(memoryStream.ToArray(), output);
|
||||
writer.WriteStartObject();
|
||||
writer.WritePropertyName(ProtocolPropertyName);
|
||||
writer.WriteValue(negotiationMessage.Protocol);
|
||||
writer.WriteEndObject();
|
||||
}
|
||||
|
||||
TextMessageFormatter.WriteRecordSeparator(output);
|
||||
}
|
||||
|
||||
public static bool TryParseMessage(ReadOnlySpan<byte> input, out NegotiationMessage negotiationMessage)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
<PropertyGroup>
|
||||
<Description>Common serialiation primitives for SignalR Clients Servers</Description>
|
||||
<TargetFramework>netstandard2.0</TargetFramework>
|
||||
<TargetFrameworks>netstandard2.0;netcoreapp2.1</TargetFrameworks>
|
||||
<RootNamespace>Microsoft.AspNetCore.SignalR</RootNamespace>
|
||||
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
|
||||
</PropertyGroup>
|
||||
|
|
|
|||
|
|
@ -229,10 +229,22 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
|
|||
|
||||
public void WriteMessage(HubMessage message, Stream output)
|
||||
{
|
||||
// We're writing data into the memoryStream so that we can get the length prefix
|
||||
using (var memoryStream = new MemoryStream())
|
||||
{
|
||||
WriteMessageCore(message, memoryStream);
|
||||
BinaryMessageFormatter.WriteMessage(new ReadOnlySpan<byte>(memoryStream.ToArray()), output);
|
||||
if (memoryStream.TryGetBuffer(out var buffer))
|
||||
{
|
||||
// Write the buffer directly
|
||||
BinaryMessageFormatter.WriteLengthPrefix(buffer.Count, output);
|
||||
output.Write(buffer.Array, buffer.Offset, buffer.Count);
|
||||
}
|
||||
else
|
||||
{
|
||||
BinaryMessageFormatter.WriteLengthPrefix(memoryStream.Length, output);
|
||||
memoryStream.Position = 0;
|
||||
memoryStream.CopyTo(output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -123,12 +123,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
{
|
||||
_logger.ReceivedMessages();
|
||||
|
||||
// TODO: Use CopyToAsync here
|
||||
var payload = await response.Content.ReadAsByteArrayAsync();
|
||||
if (payload.Length > 0)
|
||||
{
|
||||
await _application.Output.WriteAsync(payload);
|
||||
}
|
||||
var stream = new PipeWriterStream(_application.Output);
|
||||
await response.Content.CopyToAsync(stream);
|
||||
await _application.Output.FlushAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,11 +2,14 @@
|
|||
|
||||
<PropertyGroup>
|
||||
<Description>Client for ASP.NET Core SignalR</Description>
|
||||
<TargetFramework>netstandard2.0</TargetFramework>
|
||||
<TargetFrameworks>netstandard2.0;netcoreapp2.1</TargetFrameworks>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Include="..\Common\ForceAsyncAwaiter.cs" Link="ForceAsyncAwaiter.cs" />
|
||||
<Compile Include="..\Common\PipeWriterStream.cs" Link="PipeWriterStream.cs" />
|
||||
<Compile Include="..\Common\WebSocketExtensions.cs" Link="WebSocketExtensions.cs" />
|
||||
<Compile Include="..\Common\StreamExtensions.cs" Link="StreamExtensions.cs" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
|||
|
|
@ -1,19 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets.Client
|
||||
{
|
||||
public struct SendMessage
|
||||
{
|
||||
public byte[] Payload { get; }
|
||||
public TaskCompletionSource<object> SendResult { get; }
|
||||
|
||||
public SendMessage(byte[] payload, TaskCompletionSource<object> result)
|
||||
{
|
||||
Payload = payload;
|
||||
SendResult = result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2,7 +2,10 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
|
@ -39,8 +42,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
var request = new HttpRequestMessage(HttpMethod.Post, sendUrl);
|
||||
PrepareHttpRequest(request, httpOptions);
|
||||
|
||||
// TODO: Use a custom stream implementation over the ReadOnlyBuffer<byte>
|
||||
request.Content = new ByteArrayContent(buffer.ToArray());
|
||||
request.Content = new ReadOnlyBufferContent(buffer);
|
||||
|
||||
var response = await httpClient.SendAsync(request, transportCts.Token);
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
|
@ -96,5 +98,26 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
request.Headers.Add("Authorization", $"Bearer {httpOptions.AccessTokenFactory()}");
|
||||
}
|
||||
}
|
||||
|
||||
private class ReadOnlyBufferContent : HttpContent
|
||||
{
|
||||
private readonly ReadOnlyBuffer<byte> _buffer;
|
||||
|
||||
public ReadOnlyBufferContent(ReadOnlyBuffer<byte> buffer)
|
||||
{
|
||||
_buffer = buffer;
|
||||
}
|
||||
|
||||
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
|
||||
{
|
||||
return stream.WriteAsync(_buffer);
|
||||
}
|
||||
|
||||
protected override bool TryComputeLength(out long length)
|
||||
{
|
||||
length = _buffer.Length;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,8 +2,10 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.WebSockets;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Sockets.Client.Http;
|
||||
|
|
@ -102,18 +104,23 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
{
|
||||
var memory = _application.Output.GetMemory();
|
||||
|
||||
// REVIEW: Use new Memory<byte> websocket APIs on .NET Core 2.1
|
||||
memory.TryGetArray(out var arraySegment);
|
||||
#if NETCOREAPP2_1
|
||||
var receiveResult = await _webSocket.ReceiveAsync(memory, _receiveCts.Token);
|
||||
#else
|
||||
var isArray = memory.TryGetArray(out var arraySegment);
|
||||
Debug.Assert(isArray);
|
||||
|
||||
// Exceptions are handled above where the send and receive tasks are being run.
|
||||
var receiveResult = await _webSocket.ReceiveAsync(arraySegment, _receiveCts.Token);
|
||||
#endif
|
||||
|
||||
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
_logger.WebSocketClosed(receiveResult.CloseStatus);
|
||||
_logger.WebSocketClosed(_webSocket.CloseStatus);
|
||||
|
||||
if (receiveResult.CloseStatus != WebSocketCloseStatus.NormalClosure)
|
||||
if (_webSocket.CloseStatus != WebSocketCloseStatus.NormalClosure)
|
||||
{
|
||||
throw new InvalidOperationException($"Websocket closed with error: {receiveResult.CloseStatus}.");
|
||||
throw new InvalidOperationException($"Websocket closed with error: {_webSocket.CloseStatus}.");
|
||||
}
|
||||
|
||||
return;
|
||||
|
|
@ -162,7 +169,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
|
|||
{
|
||||
_logger.ReceivedFromApp(buffer.Length);
|
||||
|
||||
await _webSocket.SendAsync(new ArraySegment<byte>(buffer.ToArray()), webSocketMessageType, true, _transportCts.Token);
|
||||
await _webSocket.SendAsync(buffer, webSocketMessageType, _transportCts.Token);
|
||||
}
|
||||
else if (result.IsCompleted)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ using Microsoft.AspNetCore.Http;
|
|||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Protocols;
|
||||
using Microsoft.AspNetCore.Sockets.Features;
|
||||
using Microsoft.AspNetCore.Sockets.Http.Internal;
|
||||
using Microsoft.AspNetCore.Sockets.Internal;
|
||||
using Microsoft.AspNetCore.Sockets.Internal.Transports;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.IO.Pipelines;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
|
|
@ -52,13 +53,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
|
|||
|
||||
try
|
||||
{
|
||||
foreach (var segment in buffer)
|
||||
{
|
||||
var isArray = MemoryMarshal.TryGetArray(segment, out var arraySegment);
|
||||
// We're using the managed memory pool which is backed by managed buffers
|
||||
Debug.Assert(isArray);
|
||||
await context.Response.Body.WriteAsync(arraySegment.Array, arraySegment.Offset, arraySegment.Count);
|
||||
}
|
||||
await context.Response.Body.WriteAsync(buffer);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
|||
|
|
@ -3,7 +3,10 @@
|
|||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
||||
{
|
||||
|
|
@ -14,7 +17,31 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
|||
|
||||
private const byte LineFeed = (byte)'\n';
|
||||
|
||||
public static void WriteMessage(ReadOnlySpan<byte> payload, MemoryStream output)
|
||||
public static async Task WriteMessageAsync(ReadOnlyBuffer<byte> payload, Stream output)
|
||||
{
|
||||
var ms = new MemoryStream();
|
||||
|
||||
// TODO: There are 2 improvements to be made here
|
||||
// 1. Don't convert the entire payload into an array if if's multi-segmented.
|
||||
// 2. Don't allocate the memory stream unless the payload contains \n. If it doesn't we can just write the buffers directly
|
||||
// to the stream without modification. While it does mean that there will be smaller writes, should be fine for the most part
|
||||
// since we're using reasonably sized buffers.
|
||||
|
||||
if (payload.IsSingleSegment)
|
||||
{
|
||||
WriteMessage(payload.First, ms);
|
||||
}
|
||||
else
|
||||
{
|
||||
WriteMessage(payload.ToArray(), ms);
|
||||
}
|
||||
|
||||
ms.Position = 0;
|
||||
|
||||
await ms.CopyToAsync(output);
|
||||
}
|
||||
|
||||
public static void WriteMessage(ReadOnlyMemory<byte> payload, Stream output)
|
||||
{
|
||||
// Write the payload
|
||||
WritePayload(payload, output);
|
||||
|
|
@ -23,7 +50,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
|||
output.Write(Newline, 0, Newline.Length);
|
||||
}
|
||||
|
||||
private static void WritePayload(ReadOnlySpan<byte> payload, Stream output)
|
||||
private static void WritePayload(ReadOnlyMemory<byte> payload, Stream output)
|
||||
{
|
||||
// Short-cut for empty payload
|
||||
if (payload.Length == 0)
|
||||
|
|
@ -45,8 +72,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
|||
var keepWriting = true;
|
||||
while (keepWriting)
|
||||
{
|
||||
var span = payload.Span;
|
||||
// Seek to the end of buffer or newline
|
||||
var sliceEnd = payload.IndexOf(LineFeed);
|
||||
var sliceEnd = span.IndexOf(LineFeed);
|
||||
var nextSliceStart = sliceEnd + 1;
|
||||
if (sliceEnd < 0)
|
||||
{
|
||||
|
|
@ -56,7 +84,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
|||
// This is the last span
|
||||
keepWriting = false;
|
||||
}
|
||||
if (sliceEnd > 0 && payload[sliceEnd - 1] == '\r')
|
||||
if (sliceEnd > 0 && span[sliceEnd - 1] == '\r')
|
||||
{
|
||||
sliceEnd--;
|
||||
}
|
||||
|
|
@ -65,7 +93,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
|||
|
||||
if (nextSliceStart >= payload.Length)
|
||||
{
|
||||
payload = ReadOnlySpan<byte>.Empty;
|
||||
payload = ReadOnlyMemory<byte>.Empty;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -76,15 +104,20 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
|
|||
}
|
||||
}
|
||||
|
||||
private static void WriteLine(ReadOnlySpan<byte> payload, Stream output)
|
||||
private static void WriteLine(ReadOnlyMemory<byte> payload, Stream output)
|
||||
{
|
||||
output.Write(DataPrefix, 0, DataPrefix.Length);
|
||||
|
||||
var buffer = ArrayPool<byte>.Shared.Rent(payload.Length);
|
||||
payload.CopyTo(buffer);
|
||||
output.Write(buffer, 0, payload.Length);
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
|
||||
#if NETCOREAPP2_1
|
||||
output.Write(payload.Span);
|
||||
#else
|
||||
if (payload.Length > 0)
|
||||
{
|
||||
var isArray = MemoryMarshal.TryGetArray(payload, out var segment);
|
||||
Debug.Assert(isArray);
|
||||
output.Write(segment.Array, segment.Offset, segment.Count);
|
||||
}
|
||||
#endif
|
||||
output.Write(Newline, 0, Newline.Length);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,12 +53,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
|
|||
{
|
||||
if (!buffer.IsEmpty)
|
||||
{
|
||||
var ms = new MemoryStream();
|
||||
_logger.SSEWritingMessage(buffer.Length);
|
||||
// Don't create a copy using ToArray every time
|
||||
ServerSentEventsMessageFormatter.WriteMessage(buffer.ToArray(), ms);
|
||||
ms.Seek(0, SeekOrigin.Begin);
|
||||
await ms.CopyToAsync(context.Response.Body);
|
||||
|
||||
await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, context.Response.Body);
|
||||
}
|
||||
else if (result.IsCompleted)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -2,10 +2,10 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.WebSockets;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
|
|
@ -105,7 +105,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
|
|||
trigger.GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
private async Task<WebSocketReceiveResult> StartReceiving(WebSocket socket)
|
||||
private async Task StartReceiving(WebSocket socket)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
|
@ -113,14 +113,18 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
|
|||
{
|
||||
var memory = _application.Output.GetMemory();
|
||||
|
||||
// REVIEW: Use new Memory<byte> websocket APIs on .NET Core 2.1
|
||||
memory.TryGetArray(out var arraySegment);
|
||||
#if NETCOREAPP2_1
|
||||
var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None);
|
||||
#else
|
||||
var isArray = memory.TryGetArray(out var arraySegment);
|
||||
Debug.Assert(isArray);
|
||||
|
||||
// Exceptions are handled above where the send and receive tasks are being run.
|
||||
var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None);
|
||||
#endif
|
||||
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
return receiveResult;
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.MessageReceived(receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage);
|
||||
|
|
@ -163,7 +167,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
|
|||
|
||||
if (WebSocketCanSend(ws))
|
||||
{
|
||||
await ws.SendAsync(new ArraySegment<byte>(buffer.ToArray()), webSocketMessageType, endOfMessage: true, cancellationToken: CancellationToken.None);
|
||||
await ws.SendAsync(buffer, webSocketMessageType);
|
||||
}
|
||||
}
|
||||
catch (WebSocketException socketException) when (!WebSocketCanSend(ws))
|
||||
|
|
|
|||
|
|
@ -2,9 +2,15 @@
|
|||
|
||||
<PropertyGroup>
|
||||
<Description>Components for providing real-time bi-directional communication across the Web.</Description>
|
||||
<TargetFramework>netstandard2.0</TargetFramework>
|
||||
<TargetFrameworks>netstandard2.0;netcoreapp2.1</TargetFrameworks>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Include="..\Common\PipeWriterStream.cs" Link="PipeWriterStream.cs" />
|
||||
<Compile Include="..\Common\WebSocketExtensions.cs" Link="WebSocketExtensions.cs" />
|
||||
<Compile Include="..\Common\StreamExtensions.cs" Link="StreamExtensions.cs" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Microsoft.AspNetCore.Sockets\Microsoft.AspNetCore.Sockets.csproj" />
|
||||
<ProjectReference Include="..\Microsoft.AspNetCore.Sockets.Common.Http\Microsoft.AspNetCore.Sockets.Common.Http.csproj" />
|
||||
|
|
|
|||
|
|
@ -115,7 +115,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
|
|||
private byte[] FormatMessageToArray(byte[] message)
|
||||
{
|
||||
var output = new MemoryStream();
|
||||
TextMessageFormatter.WriteMessage(message, output);
|
||||
output.Write(message, 0, message.Length);
|
||||
TextMessageFormatter.WriteRecordSeparator(output);
|
||||
return output.ToArray();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
|
|||
var output = new MemoryStream(); // Use small chunks to test Advance/Enlarge and partial payload writing
|
||||
foreach (var message in messages)
|
||||
{
|
||||
BinaryMessageFormatter.WriteMessage(message, output);
|
||||
BinaryMessageFormatter.WriteLengthPrefix(message.Length, output);
|
||||
output.Write(message, 0, message.Length);
|
||||
}
|
||||
|
||||
Assert.Equal(expectedEncoding, output.ToArray());
|
||||
|
|
@ -77,7 +78,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
|
|||
output.Seek(offset, SeekOrigin.Begin);
|
||||
}
|
||||
|
||||
BinaryMessageFormatter.WriteMessage(payload, output);
|
||||
BinaryMessageFormatter.WriteLengthPrefix(payload.Length, output);
|
||||
output.Write(payload, 0, payload.Length);
|
||||
|
||||
Assert.Equal(encoded, output.ToArray().Skip(offset));
|
||||
}
|
||||
|
|
@ -97,7 +99,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
|
|||
output.Seek(offset, SeekOrigin.Begin);
|
||||
}
|
||||
|
||||
BinaryMessageFormatter.WriteMessage(message, output);
|
||||
BinaryMessageFormatter.WriteLengthPrefix(message.Length, output);
|
||||
output.Write(message, 0, message.Length);
|
||||
|
||||
Assert.Equal(encoded, output.ToArray().Skip(offset));
|
||||
}
|
||||
|
|
@ -108,7 +111,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
|
|||
{
|
||||
using (var ms = new MemoryStream())
|
||||
{
|
||||
BinaryMessageFormatter.WriteMessage(payload, ms);
|
||||
BinaryMessageFormatter.WriteLengthPrefix(payload.Length, ms);
|
||||
ms.Write(payload, 0, payload.Length);
|
||||
var buffer = new ReadOnlySpan<byte>(ms.ToArray());
|
||||
Assert.True(BinaryMessageParser.TryParseMessage(ref buffer, out var roundtripped));
|
||||
Assert.Equal(payload, roundtripped.ToArray());
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
|
|||
{
|
||||
using (var ms = new MemoryStream())
|
||||
{
|
||||
TextMessageFormatter.WriteMessage(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes("ABC")), ms);
|
||||
var buffer = Encoding.UTF8.GetBytes("ABC");
|
||||
ms.Write(buffer, 0, buffer.Length);
|
||||
TextMessageFormatter.WriteRecordSeparator(ms);
|
||||
Assert.Equal("ABC\u001e", Encoding.UTF8.GetString(ms.ToArray()));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -206,7 +206,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
|
|||
private static byte[] FormatMessageToArray(byte[] message)
|
||||
{
|
||||
var output = new MemoryStream();
|
||||
TextMessageFormatter.WriteMessage(message, output);
|
||||
output.Write(message, 0, message.Length);
|
||||
TextMessageFormatter.WriteRecordSeparator(output);
|
||||
return output.ToArray();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -485,8 +485,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
|
|||
{
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
BinaryMessageFormatter.WriteMessage(input, stream);
|
||||
stream.Flush();
|
||||
BinaryMessageFormatter.WriteLengthPrefix(input.Length, stream);
|
||||
stream.Write(input, 0, input.Length);
|
||||
return stream.ToArray();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue