From c8d4cf689f1b239163d50cc974a918c84c5233ed Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 20 Feb 2018 13:50:31 -0800 Subject: [PATCH] Tackling some low hanging performance fruit (#1470) * Tackling some low hanging performance fruit - Use native Memory/Span APIs on Stream and WebSocket in .NET Core 2.1 - Remove double copying in formatters - Implemented custom HttpContent over ReadOnlyBuffer --- .../MessageParserBenchmark.cs | 6 +- .../Internal => Common}/PipeWriterStream.cs | 12 +++- src/Common/StreamExtensions.cs | 30 ++++++++++ src/Common/WebSocketExtensions.cs | 43 +++++++++++++++ .../Formatters/BinaryMessageFormatter.cs | 23 ++++---- .../Formatters/TextMessageFormatter.cs | 7 +-- .../Internal/Protocol/JsonHubProtocol.cs | 14 ++--- .../Internal/Protocol/NegotiationProtocol.cs | 20 +++---- ...Microsoft.AspNetCore.SignalR.Common.csproj | 2 +- .../Protocol/MessagePackHubProtocol.cs | 14 ++++- .../LongPollingTransport.cs | 9 +-- ...soft.AspNetCore.Sockets.Client.Http.csproj | 5 +- .../SendMessage.cs | 19 ------- .../SendUtils.cs | 27 ++++++++- .../WebSocketsTransport.cs | 19 +++++-- .../HttpConnectionDispatcher.cs | 1 - .../Transports/LongPollingTransport.cs | 9 +-- .../ServerSentEventsMessageFormatter.cs | 55 +++++++++++++++---- .../Transports/ServerSentEventsTransport.cs | 7 +-- .../Transports/WebSocketsTransport.cs | 16 ++++-- .../Microsoft.AspNetCore.Sockets.Http.csproj | 8 ++- .../TestConnection.cs | 3 +- .../Formatters/BinaryMessageFormatterTests.cs | 12 ++-- .../Formatters/TextMessageFormatterTests.cs | 4 +- .../Internal/Protocol/JsonHubProtocolTests.cs | 3 +- .../Protocol/MessagePackHubProtocolTests.cs | 4 +- 26 files changed, 256 insertions(+), 116 deletions(-) rename src/{Microsoft.AspNetCore.Sockets.Http/Internal => Common}/PipeWriterStream.cs (85%) create mode 100644 src/Common/StreamExtensions.cs create mode 100644 src/Common/WebSocketExtensions.cs delete mode 100644 src/Microsoft.AspNetCore.Sockets.Client.Http/SendMessage.cs diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs index bbccc88425..a854ae7659 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs @@ -23,14 +23,16 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks var buffer = new byte[MessageLength]; Random.NextBytes(buffer); var output = new MemoryStream(); - BinaryMessageFormatter.WriteMessage(buffer, output); + BinaryMessageFormatter.WriteLengthPrefix(buffer.Length, output); + output.Write(buffer, 0, buffer.Length); _binaryInput = output.ToArray(); buffer = new byte[MessageLength]; Random.NextBytes(buffer); output = new MemoryStream(); - TextMessageFormatter.WriteMessage(buffer, output); + output.Write(buffer, 0, buffer.Length); + TextMessageFormatter.WriteRecordSeparator(output); _textInput = output.ToArray(); } diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/PipeWriterStream.cs b/src/Common/PipeWriterStream.cs similarity index 85% rename from src/Microsoft.AspNetCore.Sockets.Http/Internal/PipeWriterStream.cs rename to src/Common/PipeWriterStream.cs index 4987b06f49..f43f3d68dd 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/PipeWriterStream.cs +++ b/src/Common/PipeWriterStream.cs @@ -4,11 +4,10 @@ using System; using System.Buffers; using System.IO; -using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; -namespace Microsoft.AspNetCore.Sockets.Http.Internal +namespace System.IO.Pipelines { // Write only stream implementation for efficiently writing bytes from the request body internal class PipeWriterStream : Stream @@ -62,5 +61,14 @@ namespace Microsoft.AspNetCore.Sockets.Http.Internal Write(buffer, offset, count); return Task.CompletedTask; } + +#if NETCOREAPP2_1 + public override Task WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + _pipeWriter.Write(source.Span); + _length += source.Length; + return Task.CompletedTask; + } +#endif } } diff --git a/src/Common/StreamExtensions.cs b/src/Common/StreamExtensions.cs new file mode 100644 index 0000000000..ba3d9b7e08 --- /dev/null +++ b/src/Common/StreamExtensions.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Buffers; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO +{ + internal static class StreamExtensions + { + public static async Task WriteAsync(this Stream stream, ReadOnlyBuffer buffer, CancellationToken cancellationToken = default) + { + // REVIEW: Should we special case IsSingleSegment here? + foreach (var segment in buffer) + { +#if NETCOREAPP2_1 + await stream.WriteAsync(segment, cancellationToken); +#else + var isArray = MemoryMarshal.TryGetArray(segment, out var arraySegment); + // We're using the managed memory pool which is backed by managed buffers + Debug.Assert(isArray); + await stream.WriteAsync(arraySegment.Array, arraySegment.Offset, arraySegment.Count, cancellationToken); +#endif + } + } + } +} diff --git a/src/Common/WebSocketExtensions.cs b/src/Common/WebSocketExtensions.cs new file mode 100644 index 0000000000..ba674f2223 --- /dev/null +++ b/src/Common/WebSocketExtensions.cs @@ -0,0 +1,43 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.WebSockets +{ + internal static class WebSocketExtensions + { + public static Task SendAsync(this WebSocket webSocket, ReadOnlyBuffer buffer, WebSocketMessageType webSocketMessageType, CancellationToken cancellationToken = default) + { + // TODO: Consider chunking writes here if we get a multi segment buffer +#if NETCOREAPP2_1 + if (buffer.IsSingleSegment) + { + return webSocket.SendAsync(buffer.First, webSocketMessageType, endOfMessage: true, cancellationToken); + } + else + { + return webSocket.SendAsync(buffer.ToArray(), webSocketMessageType, endOfMessage: true, cancellationToken); + } +#else + if (buffer.IsSingleSegment) + { + var isArray = MemoryMarshal.TryGetArray(buffer.First, out var segment); + Debug.Assert(isArray); + return webSocket.SendAsync(segment, webSocketMessageType, endOfMessage: true, cancellationToken); + } + else + { + return webSocket.SendAsync(new ArraySegment(buffer.ToArray()), webSocketMessageType, true, cancellationToken); + } +#endif + } + } +} diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs index dd16ee549a..8c89ec9fce 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs @@ -2,21 +2,23 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Buffers; using System.IO; namespace Microsoft.AspNetCore.SignalR.Internal.Formatters { public static class BinaryMessageFormatter { - public unsafe static void WriteMessage(ReadOnlySpan payload, Stream output) + public static void WriteLengthPrefix(long length, Stream output) { // This code writes length prefix of the message as a VarInt. Read the comment in // the BinaryMessageParser.TryParseMessage for details. - var lenBuffer = stackalloc byte[5]; +#if NETCOREAPP2_1 + Span lenBuffer = stackalloc byte[5]; +#else + var lenBuffer = new byte[5]; +#endif var lenNumBytes = 0; - var length = payload.Length; do { ref var current = ref lenBuffer[lenNumBytes]; @@ -30,14 +32,11 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters } while (length > 0); - var buffer = ArrayPool.Shared.Rent(lenNumBytes + payload.Length); - var bufferSpan = buffer.AsSpan(); - - new ReadOnlySpan(lenBuffer, lenNumBytes).CopyTo(bufferSpan); - bufferSpan = bufferSpan.Slice(lenNumBytes); - payload.CopyTo(bufferSpan); - output.Write(buffer, 0, lenNumBytes + payload.Length); - ArrayPool.Shared.Return(buffer); +#if NETCOREAPP2_1 + output.Write(lenBuffer.Slice(0, lenNumBytes)); +#else + output.Write(lenBuffer, 0, lenNumBytes); +#endif } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageFormatter.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageFormatter.cs index ad63c4568d..50c4cebcc7 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageFormatter.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageFormatter.cs @@ -1,8 +1,6 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using System; -using System.Buffers; using System.IO; namespace Microsoft.AspNetCore.SignalR.Internal.Formatters @@ -13,11 +11,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters // will not occur (is not a valid character) and therefore it is safe to not escape it internal static readonly byte RecordSeparator = 0x1e; - public static void WriteMessage(ReadOnlySpan payload, Stream output) + public static void WriteRecordSeparator(Stream output) { - var buffer = ArrayPool.Shared.Rent(payload.Length); - payload.CopyTo(buffer); - output.Write(buffer, 0, payload.Length); output.WriteByte(RecordSeparator); } } diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs index 8eeaaec772..e1aba404c2 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.IO; using System.Runtime.ExceptionServices; +using System.Text; using Microsoft.AspNetCore.SignalR.Internal.Formatters; using Microsoft.Extensions.Options; using Newtonsoft.Json; @@ -15,6 +16,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol { public class JsonHubProtocol : IHubProtocol { + private static readonly UTF8Encoding _utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false); + private const string ResultPropertyName = "result"; private const string ItemPropertyName = "item"; private const string InvocationIdPropertyName = "invocationId"; @@ -59,13 +62,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol public void WriteMessage(HubMessage message, Stream output) { - using (var memoryStream = new MemoryStream()) - { - WriteMessageCore(message, memoryStream); - memoryStream.Flush(); - - TextMessageFormatter.WriteMessage(memoryStream.ToArray(), output); - } + WriteMessageCore(message, output); + TextMessageFormatter.WriteRecordSeparator(output); } private HubMessage ParseMessage(Stream input, IInvocationBinder binder) @@ -135,7 +133,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol private void WriteMessageCore(HubMessage message, Stream stream) { - using (var writer = new JsonTextWriter(new StreamWriter(stream))) + using (var writer = new JsonTextWriter(new StreamWriter(stream, _utf8NoBom, 1024, leaveOpen: true))) { writer.WriteStartObject(); switch (message) diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs index 3994c1bd0a..8428073934 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs @@ -5,6 +5,7 @@ using System; using System.Buffers; using System.Collections; using System.IO; +using System.Text; using Microsoft.AspNetCore.SignalR.Internal.Formatters; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -13,22 +14,21 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol { public static class NegotiationProtocol { + private static readonly UTF8Encoding _utf8NoBom = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false); + private const string ProtocolPropertyName = "protocol"; public static void WriteMessage(NegotiationMessage negotiationMessage, Stream output) { - using (var memoryStream = new MemoryStream()) + using (var writer = new JsonTextWriter(new StreamWriter(output, _utf8NoBom, 1024, leaveOpen: true))) { - using (var writer = new JsonTextWriter(new StreamWriter(memoryStream))) - { - writer.WriteStartObject(); - writer.WritePropertyName(ProtocolPropertyName); - writer.WriteValue(negotiationMessage.Protocol); - writer.WriteEndObject(); - } - - TextMessageFormatter.WriteMessage(memoryStream.ToArray(), output); + writer.WriteStartObject(); + writer.WritePropertyName(ProtocolPropertyName); + writer.WriteValue(negotiationMessage.Protocol); + writer.WriteEndObject(); } + + TextMessageFormatter.WriteRecordSeparator(output); } public static bool TryParseMessage(ReadOnlySpan input, out NegotiationMessage negotiationMessage) diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj b/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj index 1420cd5b9f..bf9c4302e2 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj +++ b/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj @@ -2,7 +2,7 @@ Common serialiation primitives for SignalR Clients Servers - netstandard2.0 + netstandard2.0;netcoreapp2.1 Microsoft.AspNetCore.SignalR true diff --git a/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs index 641df71fc1..c14dfe7f9d 100644 --- a/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs @@ -229,10 +229,22 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol public void WriteMessage(HubMessage message, Stream output) { + // We're writing data into the memoryStream so that we can get the length prefix using (var memoryStream = new MemoryStream()) { WriteMessageCore(message, memoryStream); - BinaryMessageFormatter.WriteMessage(new ReadOnlySpan(memoryStream.ToArray()), output); + if (memoryStream.TryGetBuffer(out var buffer)) + { + // Write the buffer directly + BinaryMessageFormatter.WriteLengthPrefix(buffer.Count, output); + output.Write(buffer.Array, buffer.Offset, buffer.Count); + } + else + { + BinaryMessageFormatter.WriteLengthPrefix(memoryStream.Length, output); + memoryStream.Position = 0; + memoryStream.CopyTo(output); + } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs index 67178be7fc..1b2968ae72 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs @@ -123,12 +123,9 @@ namespace Microsoft.AspNetCore.Sockets.Client { _logger.ReceivedMessages(); - // TODO: Use CopyToAsync here - var payload = await response.Content.ReadAsByteArrayAsync(); - if (payload.Length > 0) - { - await _application.Output.WriteAsync(payload); - } + var stream = new PipeWriterStream(_application.Output); + await response.Content.CopyToAsync(stream); + await _application.Output.FlushAsync(); } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj index a6d5d54d4b..afab54c3f3 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj @@ -2,11 +2,14 @@ Client for ASP.NET Core SignalR - netstandard2.0 + netstandard2.0;netcoreapp2.1 + + + diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendMessage.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendMessage.cs deleted file mode 100644 index 2cf570e9e5..0000000000 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendMessage.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System.Threading.Tasks; - -namespace Microsoft.AspNetCore.Sockets.Client -{ - public struct SendMessage - { - public byte[] Payload { get; } - public TaskCompletionSource SendResult { get; } - - public SendMessage(byte[] payload, TaskCompletionSource result) - { - Payload = payload; - SendResult = result; - } - } -} diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs index 4a4b7631d2..5c1a8830fa 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs @@ -2,7 +2,10 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; +using System.IO; using System.IO.Pipelines; +using System.Net; using System.Net.Http; using System.Threading; using System.Threading.Tasks; @@ -39,8 +42,7 @@ namespace Microsoft.AspNetCore.Sockets.Client var request = new HttpRequestMessage(HttpMethod.Post, sendUrl); PrepareHttpRequest(request, httpOptions); - // TODO: Use a custom stream implementation over the ReadOnlyBuffer - request.Content = new ByteArrayContent(buffer.ToArray()); + request.Content = new ReadOnlyBufferContent(buffer); var response = await httpClient.SendAsync(request, transportCts.Token); response.EnsureSuccessStatusCode(); @@ -96,5 +98,26 @@ namespace Microsoft.AspNetCore.Sockets.Client request.Headers.Add("Authorization", $"Bearer {httpOptions.AccessTokenFactory()}"); } } + + private class ReadOnlyBufferContent : HttpContent + { + private readonly ReadOnlyBuffer _buffer; + + public ReadOnlyBufferContent(ReadOnlyBuffer buffer) + { + _buffer = buffer; + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + return stream.WriteAsync(_buffer); + } + + protected override bool TryComputeLength(out long length) + { + length = _buffer.Length; + return true; + } + } } } diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs index d338b5eb5b..519b3ec88e 100644 --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs @@ -2,8 +2,10 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.IO.Pipelines; using System.Net.WebSockets; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Sockets.Client.Http; @@ -102,18 +104,23 @@ namespace Microsoft.AspNetCore.Sockets.Client { var memory = _application.Output.GetMemory(); - // REVIEW: Use new Memory websocket APIs on .NET Core 2.1 - memory.TryGetArray(out var arraySegment); +#if NETCOREAPP2_1 + var receiveResult = await _webSocket.ReceiveAsync(memory, _receiveCts.Token); +#else + var isArray = memory.TryGetArray(out var arraySegment); + Debug.Assert(isArray); // Exceptions are handled above where the send and receive tasks are being run. var receiveResult = await _webSocket.ReceiveAsync(arraySegment, _receiveCts.Token); +#endif + if (receiveResult.MessageType == WebSocketMessageType.Close) { - _logger.WebSocketClosed(receiveResult.CloseStatus); + _logger.WebSocketClosed(_webSocket.CloseStatus); - if (receiveResult.CloseStatus != WebSocketCloseStatus.NormalClosure) + if (_webSocket.CloseStatus != WebSocketCloseStatus.NormalClosure) { - throw new InvalidOperationException($"Websocket closed with error: {receiveResult.CloseStatus}."); + throw new InvalidOperationException($"Websocket closed with error: {_webSocket.CloseStatus}."); } return; @@ -162,7 +169,7 @@ namespace Microsoft.AspNetCore.Sockets.Client { _logger.ReceivedFromApp(buffer.Length); - await _webSocket.SendAsync(new ArraySegment(buffer.ToArray()), webSocketMessageType, true, _transportCts.Token); + await _webSocket.SendAsync(buffer, webSocketMessageType, _transportCts.Token); } else if (result.IsCompleted) { diff --git a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs index 78b669683f..4a98254367 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs @@ -12,7 +12,6 @@ using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Protocols; using Microsoft.AspNetCore.Sockets.Features; -using Microsoft.AspNetCore.Sockets.Http.Internal; using Microsoft.AspNetCore.Sockets.Internal; using Microsoft.AspNetCore.Sockets.Internal.Transports; using Microsoft.Extensions.Logging; diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs index 86ae885beb..877893e985 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs @@ -3,6 +3,7 @@ using System; using System.Diagnostics; +using System.IO; using System.IO.Pipelines; using System.Runtime.InteropServices; using System.Threading; @@ -52,13 +53,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports try { - foreach (var segment in buffer) - { - var isArray = MemoryMarshal.TryGetArray(segment, out var arraySegment); - // We're using the managed memory pool which is backed by managed buffers - Debug.Assert(isArray); - await context.Response.Body.WriteAsync(arraySegment.Array, arraySegment.Offset, arraySegment.Count); - } + await context.Response.Body.WriteAsync(buffer); } finally { diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs index 21b079c263..2ca5123fab 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs @@ -3,7 +3,10 @@ using System; using System.Buffers; +using System.Diagnostics; using System.IO; +using System.Runtime.InteropServices; +using System.Threading.Tasks; namespace Microsoft.AspNetCore.Sockets.Internal.Formatters { @@ -14,7 +17,31 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters private const byte LineFeed = (byte)'\n'; - public static void WriteMessage(ReadOnlySpan payload, MemoryStream output) + public static async Task WriteMessageAsync(ReadOnlyBuffer payload, Stream output) + { + var ms = new MemoryStream(); + + // TODO: There are 2 improvements to be made here + // 1. Don't convert the entire payload into an array if if's multi-segmented. + // 2. Don't allocate the memory stream unless the payload contains \n. If it doesn't we can just write the buffers directly + // to the stream without modification. While it does mean that there will be smaller writes, should be fine for the most part + // since we're using reasonably sized buffers. + + if (payload.IsSingleSegment) + { + WriteMessage(payload.First, ms); + } + else + { + WriteMessage(payload.ToArray(), ms); + } + + ms.Position = 0; + + await ms.CopyToAsync(output); + } + + public static void WriteMessage(ReadOnlyMemory payload, Stream output) { // Write the payload WritePayload(payload, output); @@ -23,7 +50,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters output.Write(Newline, 0, Newline.Length); } - private static void WritePayload(ReadOnlySpan payload, Stream output) + private static void WritePayload(ReadOnlyMemory payload, Stream output) { // Short-cut for empty payload if (payload.Length == 0) @@ -45,8 +72,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters var keepWriting = true; while (keepWriting) { + var span = payload.Span; // Seek to the end of buffer or newline - var sliceEnd = payload.IndexOf(LineFeed); + var sliceEnd = span.IndexOf(LineFeed); var nextSliceStart = sliceEnd + 1; if (sliceEnd < 0) { @@ -56,7 +84,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters // This is the last span keepWriting = false; } - if (sliceEnd > 0 && payload[sliceEnd - 1] == '\r') + if (sliceEnd > 0 && span[sliceEnd - 1] == '\r') { sliceEnd--; } @@ -65,7 +93,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters if (nextSliceStart >= payload.Length) { - payload = ReadOnlySpan.Empty; + payload = ReadOnlyMemory.Empty; } else { @@ -76,15 +104,20 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters } } - private static void WriteLine(ReadOnlySpan payload, Stream output) + private static void WriteLine(ReadOnlyMemory payload, Stream output) { output.Write(DataPrefix, 0, DataPrefix.Length); - var buffer = ArrayPool.Shared.Rent(payload.Length); - payload.CopyTo(buffer); - output.Write(buffer, 0, payload.Length); - ArrayPool.Shared.Return(buffer); - +#if NETCOREAPP2_1 + output.Write(payload.Span); +#else + if (payload.Length > 0) + { + var isArray = MemoryMarshal.TryGetArray(payload, out var segment); + Debug.Assert(isArray); + output.Write(segment.Array, segment.Offset, segment.Count); + } +#endif output.Write(Newline, 0, Newline.Length); } } diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs index 2526095b07..3f4c973c54 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs @@ -53,12 +53,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports { if (!buffer.IsEmpty) { - var ms = new MemoryStream(); _logger.SSEWritingMessage(buffer.Length); - // Don't create a copy using ToArray every time - ServerSentEventsMessageFormatter.WriteMessage(buffer.ToArray(), ms); - ms.Seek(0, SeekOrigin.Begin); - await ms.CopyToAsync(context.Response.Body); + + await ServerSentEventsMessageFormatter.WriteMessageAsync(buffer, context.Response.Body); } else if (result.IsCompleted) { diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs index ed90a9e099..a6edb00588 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs @@ -2,10 +2,10 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; using System.Diagnostics; using System.IO.Pipelines; using System.Net.WebSockets; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http; @@ -105,7 +105,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports trigger.GetAwaiter().GetResult(); } - private async Task StartReceiving(WebSocket socket) + private async Task StartReceiving(WebSocket socket) { try { @@ -113,14 +113,18 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports { var memory = _application.Output.GetMemory(); - // REVIEW: Use new Memory websocket APIs on .NET Core 2.1 - memory.TryGetArray(out var arraySegment); +#if NETCOREAPP2_1 + var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None); +#else + var isArray = memory.TryGetArray(out var arraySegment); + Debug.Assert(isArray); // Exceptions are handled above where the send and receive tasks are being run. var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None); +#endif if (receiveResult.MessageType == WebSocketMessageType.Close) { - return receiveResult; + return; } _logger.MessageReceived(receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage); @@ -163,7 +167,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports if (WebSocketCanSend(ws)) { - await ws.SendAsync(new ArraySegment(buffer.ToArray()), webSocketMessageType, endOfMessage: true, cancellationToken: CancellationToken.None); + await ws.SendAsync(buffer, webSocketMessageType); } } catch (WebSocketException socketException) when (!WebSocketCanSend(ws)) diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj index 940cca23a8..1abac737f4 100644 --- a/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj +++ b/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj @@ -2,9 +2,15 @@ Components for providing real-time bi-directional communication across the Web. - netstandard2.0 + netstandard2.0;netcoreapp2.1 + + + + + + diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs index d1107bf394..a5651837e3 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs @@ -115,7 +115,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests private byte[] FormatMessageToArray(byte[] message) { var output = new MemoryStream(); - TextMessageFormatter.WriteMessage(message, output); + output.Write(message, 0, message.Length); + TextMessageFormatter.WriteRecordSeparator(output); return output.ToArray(); } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs index a60405217b..f650173399 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs @@ -33,7 +33,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters var output = new MemoryStream(); // Use small chunks to test Advance/Enlarge and partial payload writing foreach (var message in messages) { - BinaryMessageFormatter.WriteMessage(message, output); + BinaryMessageFormatter.WriteLengthPrefix(message.Length, output); + output.Write(message, 0, message.Length); } Assert.Equal(expectedEncoding, output.ToArray()); @@ -77,7 +78,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters output.Seek(offset, SeekOrigin.Begin); } - BinaryMessageFormatter.WriteMessage(payload, output); + BinaryMessageFormatter.WriteLengthPrefix(payload.Length, output); + output.Write(payload, 0, payload.Length); Assert.Equal(encoded, output.ToArray().Skip(offset)); } @@ -97,7 +99,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters output.Seek(offset, SeekOrigin.Begin); } - BinaryMessageFormatter.WriteMessage(message, output); + BinaryMessageFormatter.WriteLengthPrefix(message.Length, output); + output.Write(message, 0, message.Length); Assert.Equal(encoded, output.ToArray().Skip(offset)); } @@ -108,7 +111,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters { using (var ms = new MemoryStream()) { - BinaryMessageFormatter.WriteMessage(payload, ms); + BinaryMessageFormatter.WriteLengthPrefix(payload.Length, ms); + ms.Write(payload, 0, payload.Length); var buffer = new ReadOnlySpan(ms.ToArray()); Assert.True(BinaryMessageParser.TryParseMessage(ref buffer, out var roundtripped)); Assert.Equal(payload, roundtripped.ToArray()); diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageFormatterTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageFormatterTests.cs index f9ae1b4fd5..ad4b1c0916 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageFormatterTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageFormatterTests.cs @@ -16,7 +16,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters { using (var ms = new MemoryStream()) { - TextMessageFormatter.WriteMessage(new ReadOnlySpan(Encoding.UTF8.GetBytes("ABC")), ms); + var buffer = Encoding.UTF8.GetBytes("ABC"); + ms.Write(buffer, 0, buffer.Length); + TextMessageFormatter.WriteRecordSeparator(ms); Assert.Equal("ABC\u001e", Encoding.UTF8.GetString(ms.ToArray())); } } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs index 21535851b9..220ccafc9f 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs @@ -206,7 +206,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol private static byte[] FormatMessageToArray(byte[] message) { var output = new MemoryStream(); - TextMessageFormatter.WriteMessage(message, output); + output.Write(message, 0, message.Length); + TextMessageFormatter.WriteRecordSeparator(output); return output.ToArray(); } } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs index 762a79df88..84c8d7c845 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs @@ -485,8 +485,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol { using (var stream = new MemoryStream()) { - BinaryMessageFormatter.WriteMessage(input, stream); - stream.Flush(); + BinaryMessageFormatter.WriteLengthPrefix(input.Length, stream); + stream.Write(input, 0, input.Length); return stream.ToArray(); } }