From 19b9dca268fc31b74b9c84dbc444876fdef0af46 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 28 Mar 2018 12:08:16 -0700 Subject: [PATCH] Change IHubProtocol interface to support partial parsing (#1745) - These are the finishing touches before we disable batching on the C# client and on the server. We're changing the IHubProtocol interface to modify the input buffer with what was consumed. We're also changing it to parse a single message at a time to be match what output writing does. - Added TryParseResponseMessage and made it look like TryParseRequestMessage --- .../DefaultHubDispatcherBenchmark.cs | 4 +- .../HubProtocolBenchmark.cs | 5 +- .../MessageParserBenchmark.cs | 9 +- .../HubConnection.Log.cs | 32 ---- .../HubConnection.cs | 168 +++++++++--------- .../Formatters/BinaryMessageParser.cs | 16 +- .../Internal/Formatters/TextMessageParser.cs | 17 -- .../Internal/Protocol/HandshakeProtocol.cs | 38 ++-- .../Internal/Protocol/IHubProtocol.cs | 3 +- .../Internal/Protocol/JsonHubProtocol.cs | 34 ++-- .../Internal/Protocol/Utf8BufferTextReader.cs | 9 +- .../HubConnectionContext.cs | 12 +- .../HubConnectionHandler.cs | 21 +-- .../Protocol/MessagePackHubProtocol.cs | 35 ++-- .../HubConnectionTests.cs | 5 +- .../Formatters/BinaryMessageFormatterTests.cs | 3 +- .../Formatters/BinaryMessageParserTests.cs | 13 +- .../Formatters/TextMessageParserTests.cs | 9 +- .../Protocol/HandshakeProtocolTests.cs | 29 ++- .../Internal/Protocol/JsonHubProtocolTests.cs | 29 +-- .../Protocol/MessagePackHubProtocolTests.cs | 45 ++--- .../Protocol/Utf8BufferTextReaderTests.cs | 10 +- .../TestClient.cs | 31 +--- 23 files changed, 265 insertions(+), 312 deletions(-) diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/DefaultHubDispatcherBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/DefaultHubDispatcherBenchmark.cs index bae4615dbf..29c2984930 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/DefaultHubDispatcherBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/DefaultHubDispatcherBenchmark.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.IO.Pipelines; @@ -58,8 +59,9 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks return true; } - public bool TryParseMessages(ReadOnlyMemory input, IInvocationBinder binder, IList messages) + public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage message) { + message = null; return false; } diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubProtocolBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubProtocolBenchmark.cs index 5d8468a609..07003d01c4 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubProtocolBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/HubProtocolBenchmark.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using BenchmarkDotNet.Attributes; @@ -59,8 +60,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks [Benchmark] public void ReadSingleMessage() { - var messages = new List(); - if (!_hubProtocol.TryParseMessages(_binaryInput, _binder, messages)) + var data = new ReadOnlySequence(_binaryInput); + if (!_hubProtocol.TryParseMessage(ref data, _binder, out _)) { throw new InvalidOperationException("Failed to read message"); } diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs index f05fd178da..a00ca8eba0 100644 --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.IO; using BenchmarkDotNet.Attributes; using Microsoft.AspNetCore.SignalR.Internal.Formatters; @@ -40,8 +41,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks [Benchmark] public void SingleBinaryMessage() { - ReadOnlyMemory buffer = _binaryInput; - if (!BinaryMessageParser.TryParseMessage(ref buffer, out _)) + var data = new ReadOnlySequence(_binaryInput); + if (!BinaryMessageParser.TryParseMessage(ref data, out _)) { throw new InvalidOperationException("Failed to parse"); } @@ -50,8 +51,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks [Benchmark] public void SingleTextMessage() { - ReadOnlyMemory buffer = _textInput; - if (!TextMessageParser.TryParseMessage(ref buffer, out _)) + var data = new ReadOnlySequence(_textInput); + if (!TextMessageParser.TryParseMessage(ref data, out _)) { throw new InvalidOperationException("Failed to parse"); } diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.Log.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.Log.cs index 4edad81a03..b478579acf 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.Log.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.Log.cs @@ -96,21 +96,9 @@ namespace Microsoft.AspNetCore.SignalR.Client private static readonly Action _sendingHubHandshake = LoggerMessage.Define(LogLevel.Debug, new EventId(28, "SendingHubHandshake"), "Sending Hub Handshake."); - private static readonly Action _parsingMessages = - LoggerMessage.Define(LogLevel.Debug, new EventId(29, "ParsingMessages"), "Received {Count} bytes. Parsing message(s)."); - - private static readonly Action _receivingMessages = - LoggerMessage.Define(LogLevel.Debug, new EventId(30, "ReceivingMessages"), "Received {MessageCount} message(s)."); - private static readonly Action _receivedPing = LoggerMessage.Define(LogLevel.Trace, new EventId(31, "ReceivedPing"), "Received a ping message."); - private static readonly Action _processedMessages = - LoggerMessage.Define(LogLevel.Debug, new EventId(32, "ProcessedMessages"), "Finished processing {MessageCount} message(s)."); - - private static readonly Action _failedParsing = - LoggerMessage.Define(LogLevel.Warning, new EventId(33, "FailedParsing"), "No messages parsed from {Count} byte(s)."); - private static readonly Action _errorInvokingClientSideMethod = LoggerMessage.Define(LogLevel.Error, new EventId(34, "ErrorInvokingClientSideMethod"), "Invoking client side method '{MethodName}' failed."); @@ -329,31 +317,11 @@ namespace Microsoft.AspNetCore.SignalR.Client _sendingHubHandshake(logger, null); } - public static void ParsingMessages(ILogger logger, int byteCount) - { - _parsingMessages(logger, byteCount, null); - } - - public static void ReceivingMessages(ILogger logger, int messageCount) - { - _receivingMessages(logger, messageCount, null); - } - public static void ReceivedPing(ILogger logger) { _receivedPing(logger, null); } - public static void ProcessedMessages(ILogger logger, int messageCount) - { - _processedMessages(logger, messageCount, null); - } - - public static void FailedParsing(ILogger logger, int byteCount) - { - _failedParsing(logger, byteCount, null); - } - public static void ErrorInvokingClientSideMethod(ILogger logger, string methodName, Exception exception) { _errorInvokingClientSideMethod(logger, methodName, exception); diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs index c588fa4a9f..a0d4c23f2e 100644 --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs @@ -178,7 +178,7 @@ namespace Microsoft.AspNetCore.SignalR.Client CheckDisposed(); connectionState = _connectionState; - + // Set the stopping flag so that any invocations after this get a useful error message instead of // silently failing or throwing an error about the pipe being completed. if (connectionState != null) @@ -374,74 +374,53 @@ namespace Microsoft.AspNetCore.SignalR.Client } } - private async Task<(bool close, Exception exception)> ProcessMessagesAsync(ReadOnlySequence buffer, ConnectionState connectionState) + private async Task<(bool close, Exception exception)> ProcessMessagesAsync(HubMessage message, ConnectionState connectionState) { - Log.ProcessingMessage(_logger, buffer.Length); - - // TODO: Don't ToArray it :) - var data = buffer.ToArray(); - - var currentData = new ReadOnlyMemory(data); - Log.ParsingMessages(_logger, currentData.Length); - - var messages = new List(); - if (_protocol.TryParseMessages(currentData, connectionState, messages)) + InvocationRequest irq; + switch (message) { - Log.ReceivingMessages(_logger, messages.Count); - foreach (var message in messages) - { - InvocationRequest irq; - switch (message) + case InvocationMessage invocation: + Log.ReceivedInvocation(_logger, invocation.InvocationId, invocation.Target, + invocation.ArgumentBindingException != null ? null : invocation.Arguments); + await DispatchInvocationAsync(invocation); + break; + case CompletionMessage completion: + if (!connectionState.TryRemoveInvocation(completion.InvocationId, out irq)) { - case InvocationMessage invocation: - Log.ReceivedInvocation(_logger, invocation.InvocationId, invocation.Target, - invocation.ArgumentBindingException != null ? null : invocation.Arguments); - await DispatchInvocationAsync(invocation); - break; - case CompletionMessage completion: - if (!connectionState.TryRemoveInvocation(completion.InvocationId, out irq)) - { - Log.DroppedCompletionMessage(_logger, completion.InvocationId); - } - else - { - DispatchInvocationCompletion(completion, irq); - irq.Dispose(); - } - break; - case StreamItemMessage streamItem: - // Complete the invocation with an error, we don't support streaming (yet) - if (!connectionState.TryGetInvocation(streamItem.InvocationId, out irq)) - { - Log.DroppedStreamMessage(_logger, streamItem.InvocationId); - return (close: false, exception: null); - } - await DispatchInvocationStreamItemAsync(streamItem, irq); - break; - case CloseMessage close: - if (string.IsNullOrEmpty(close.Error)) - { - Log.ReceivedClose(_logger); - return (close: true, exception: null); - } - else - { - Log.ReceivedCloseWithError(_logger, close.Error); - return (close: true, exception: new HubException($"The server closed the connection with the following error: {close.Error}")); - } - case PingMessage _: - Log.ReceivedPing(_logger); - // Nothing to do on receipt of a ping. - break; - default: - throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}"); + Log.DroppedCompletionMessage(_logger, completion.InvocationId); } - } - Log.ProcessedMessages(_logger, messages.Count); - } - else - { - Log.FailedParsing(_logger, data.Length); + else + { + DispatchInvocationCompletion(completion, irq); + irq.Dispose(); + } + break; + case StreamItemMessage streamItem: + // Complete the invocation with an error, we don't support streaming (yet) + if (!connectionState.TryGetInvocation(streamItem.InvocationId, out irq)) + { + Log.DroppedStreamMessage(_logger, streamItem.InvocationId); + return (close: false, exception: null); + } + await DispatchInvocationStreamItemAsync(streamItem, irq); + break; + case CloseMessage close: + if (string.IsNullOrEmpty(close.Error)) + { + Log.ReceivedClose(_logger); + return (close: true, exception: null); + } + else + { + Log.ReceivedCloseWithError(_logger, close.Error); + return (close: true, exception: new HubException($"The server closed the connection with the following error: {close.Error}")); + } + case PingMessage _: + Log.ReceivedPing(_logger); + // Nothing to do on receipt of a ping. + break; + default: + throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}"); } return (close: false, exception: null); @@ -536,25 +515,23 @@ namespace Microsoft.AspNetCore.SignalR.Client { var result = await _connectionState.Connection.Transport.Input.ReadAsync(); var buffer = result.Buffer; - var consumed = buffer.Start; try { // Read first message out of the incoming data - if (!buffer.IsEmpty && TextMessageParser.TryParseMessage(ref buffer, out var payload)) + if (!buffer.IsEmpty) { - // Buffer was advanced to the end of the message by TryParseMessage - consumed = buffer.Start; - var message = HandshakeProtocol.ParseResponseMessage(payload.ToArray()); - - if (!string.IsNullOrEmpty(message.Error)) + if (HandshakeProtocol.TryParseResponseMessage(ref buffer, out var message)) { - Log.HandshakeServerError(_logger, message.Error); - throw new HubException( - $"Unable to complete handshake with the server due to an error: {message.Error}"); - } + if (!string.IsNullOrEmpty(message.Error)) + { + Log.HandshakeServerError(_logger, message.Error); + throw new HubException( + $"Unable to complete handshake with the server due to an error: {message.Error}"); + } - break; + break; + } } else if (result.IsCompleted) { @@ -565,7 +542,10 @@ namespace Microsoft.AspNetCore.SignalR.Client } finally { - _connectionState.Connection.Transport.Input.AdvanceTo(consumed); + // The buffer was sliced up to where it was consumed, so we can just advance to the start. + // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data + // before yielding the read again. + _connectionState.Connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End); } } } @@ -594,8 +574,6 @@ namespace Microsoft.AspNetCore.SignalR.Client { var result = await connectionState.Connection.Transport.Input.ReadAsync(); var buffer = result.Buffer; - var consumed = buffer.End; // TODO: Support partial messages - var examined = buffer.End; try { @@ -608,12 +586,27 @@ namespace Microsoft.AspNetCore.SignalR.Client { ResetTimeoutTimer(timeoutTimer); - // We have data, process it - var (close, exception) = await ProcessMessagesAsync(buffer, connectionState); + Log.ProcessingMessage(_logger, buffer.Length); + + var close = false; + + while (_protocol.TryParseMessage(ref buffer, connectionState, out var message)) + { + Exception exception; + + // We have data, process it + (close, exception) = await ProcessMessagesAsync(message, connectionState); + if (close) + { + // Closing because we got a close frame, possibly with an error in it. + connectionState.CloseException = exception; + break; + } + } + + // If we're closing stop everything if (close) { - // Closing because we got a close frame, possibly with an error in it. - connectionState.CloseException = exception; break; } } @@ -624,7 +617,10 @@ namespace Microsoft.AspNetCore.SignalR.Client } finally { - connectionState.Connection.Transport.Input.AdvanceTo(consumed, examined); + // The buffer was sliced up to where it was consumed, so we can just advance to the start. + // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data + // before yielding the read again. + connectionState.Connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End); } } } @@ -633,7 +629,7 @@ namespace Microsoft.AspNetCore.SignalR.Client Log.ServerDisconnectedWithError(_logger, ex); connectionState.CloseException = ex; } - + // Clear the connectionState field await WaitConnectionLockAsync(); try diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs index 4d80c937b7..9c5ca212eb 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; namespace Microsoft.AspNetCore.SignalR.Internal.Formatters { @@ -9,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters { private const int MaxLengthPrefixSize = 5; - public static bool TryParseMessage(ref ReadOnlyMemory buffer, out ReadOnlyMemory payload) + public static bool TryParseMessage(ref ReadOnlySequence buffer, out ReadOnlySequence payload) { if (buffer.IsEmpty) { @@ -33,7 +34,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters var numBytes = 0; var lengthPrefixBuffer = buffer.Slice(0, Math.Min(MaxLengthPrefixSize, buffer.Length)); - var span = lengthPrefixBuffer.Span; + var span = GetSpan(lengthPrefixBuffer); byte byteRead; do @@ -70,5 +71,16 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters buffer = buffer.Slice(numBytes + (int)length); return true; } + + private static ReadOnlySpan GetSpan(in ReadOnlySequence lengthPrefixBuffer) + { + if (lengthPrefixBuffer.IsSingleSegment) + { + return lengthPrefixBuffer.First.Span; + } + + // Should be rare + return lengthPrefixBuffer.ToArray(); + } } } \ No newline at end of file diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs index b7512d827c..566465e278 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs @@ -24,22 +24,5 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters return true; } - - public static bool TryParseMessage(ref ReadOnlyMemory buffer, out ReadOnlyMemory payload) - { - var index = buffer.Span.IndexOf(TextMessageFormatter.RecordSeparator); - if (index == -1) - { - payload = default; - return false; - } - - payload = buffer.Slice(0, index); - - // Skip record separator - buffer = buffer.Slice(index + 1); - - return true; - } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/HandshakeProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/HandshakeProtocol.cs index 2ddc5a8f4a..a2c662775c 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/HandshakeProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/HandshakeProtocol.cs @@ -56,8 +56,14 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol return new JsonTextWriter(new StreamWriter(output, _utf8NoBom, 1024, leaveOpen: true)); } - public static HandshakeResponseMessage ParseResponseMessage(ReadOnlyMemory payload) + public static bool TryParseResponseMessage(ref ReadOnlySequence buffer, out HandshakeResponseMessage responseMessage) { + if (!TextMessageParser.TryParseMessage(ref buffer, out var payload)) + { + responseMessage = null; + return false; + } + var textReader = Utf8BufferTextReader.Get(payload); try @@ -76,7 +82,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol } var error = JsonUtils.GetOptionalProperty(handshakeJObject, ErrorPropertyName); - return new HandshakeResponseMessage(error); + responseMessage = new HandshakeResponseMessage(error); + return true; } } finally @@ -85,19 +92,14 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol } } - public static bool TryParseRequestMessage(ReadOnlySequence buffer, out HandshakeRequestMessage requestMessage, out SequencePosition consumed, out SequencePosition examined) + public static bool TryParseRequestMessage(ref ReadOnlySequence buffer, out HandshakeRequestMessage requestMessage) { - if (!TryReadMessageIntoSingleMemory(buffer, out consumed, out examined, out var memory)) + if (!TextMessageParser.TryParseMessage(ref buffer, out var payload)) { requestMessage = null; return false; } - if (!TextMessageParser.TryParseMessage(ref memory, out var payload)) - { - throw new InvalidDataException("Unable to parse payload as a handshake request message."); - } - var textReader = Utf8BufferTextReader.Get(payload); try { @@ -117,23 +119,5 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol return true; } - - internal static bool TryReadMessageIntoSingleMemory(ReadOnlySequence buffer, out SequencePosition consumed, out SequencePosition examined, out ReadOnlyMemory memory) - { - var separator = buffer.PositionOf(TextMessageFormatter.RecordSeparator); - if (separator == null) - { - // Haven't seen the entire message so bail - consumed = buffer.Start; - examined = buffer.End; - memory = null; - return false; - } - - consumed = buffer.GetPosition(1, separator.Value); - examined = consumed; - memory = buffer.IsSingleSegment ? buffer.First : buffer.ToArray(); - return true; - } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs index 7430391c7e..b369714979 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using Microsoft.AspNetCore.Connections; @@ -16,7 +17,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol TransferFormat TransferFormat { get; } - bool TryParseMessages(ReadOnlyMemory input, IInvocationBinder binder, IList messages); + bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage message); void WriteMessage(HubMessage message, Stream output); diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs index f46117dca7..0e0dfb15e3 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Runtime.ExceptionServices; @@ -54,27 +55,26 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol return version == Version; } - public bool TryParseMessages(ReadOnlyMemory input, IInvocationBinder binder, IList messages) + public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage message) { - while (TextMessageParser.TryParseMessage(ref input, out var payload)) + if (!TextMessageParser.TryParseMessage(ref input, out var payload)) { - var textReader = Utf8BufferTextReader.Get(payload); - - try - { - var message = ParseMessage(textReader, binder); - if (message != null) - { - messages.Add(message); - } - } - finally - { - Utf8BufferTextReader.Return(textReader); - } + message = null; + return false; } - return messages.Count > 0; + var textReader = Utf8BufferTextReader.Get(payload); + + try + { + message = ParseMessage(textReader, binder); + } + finally + { + Utf8BufferTextReader.Return(textReader); + } + + return message != null; } public void WriteMessage(HubMessage message, Stream output) diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/Utf8BufferTextReader.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/Utf8BufferTextReader.cs index 227f74fc98..5a6cbb99bd 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/Utf8BufferTextReader.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/Utf8BufferTextReader.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.IO; using System.Runtime.InteropServices; using System.Text; @@ -10,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol { internal class Utf8BufferTextReader : TextReader { - private ReadOnlyMemory _utf8Buffer; + private ReadOnlySequence _utf8Buffer; private Decoder _decoder; [ThreadStatic] @@ -25,7 +26,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol _decoder = Encoding.UTF8.GetDecoder(); } - public static Utf8BufferTextReader Get(ReadOnlyMemory utf8Buffer) + public static Utf8BufferTextReader Get(in ReadOnlySequence utf8Buffer) { var reader = _cachedInstance; if (reader == null) @@ -55,7 +56,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol #endif } - public void SetBuffer(ReadOnlyMemory utf8Buffer) + public void SetBuffer(in ReadOnlySequence utf8Buffer) { _utf8Buffer = utf8Buffer; _decoder.Reset(); @@ -68,7 +69,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol return 0; } - var source = _utf8Buffer.Span; + var source = _utf8Buffer.First.Span; var bytesUsed = 0; var charsUsed = 0; #if NETCOREAPP2_1 diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs index aa13f4cac8..525cf55136 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs @@ -221,15 +221,19 @@ namespace Microsoft.AspNetCore.SignalR { var result = await _connectionContext.Transport.Input.ReadAsync(cts.Token); var buffer = result.Buffer; - var consumed = buffer.End; + var consumed = buffer.Start; var examined = buffer.End; try { if (!buffer.IsEmpty) { - if (HandshakeProtocol.TryParseRequestMessage(buffer, out var handshakeRequestMessage, out consumed, out examined)) + if (HandshakeProtocol.TryParseRequestMessage(ref buffer, out var handshakeRequestMessage)) { + // We parsed the handshake + consumed = buffer.Start; + examined = consumed; + Protocol = protocolResolver.GetProtocol(handshakeRequestMessage.Protocol, supportedProtocols); if (Protocol == null) { @@ -277,6 +281,10 @@ namespace Microsoft.AspNetCore.SignalR await WriteHandshakeResponseAsync(HandshakeResponseMessage.Empty); return true; } + else + { + _logger.LogInformation("Didn't parse the handshake"); + } } else if (result.IsCompleted) { diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs index d54eeac045..ea0d5aaaaf 100644 --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs @@ -164,24 +164,16 @@ namespace Microsoft.AspNetCore.SignalR { var result = await connection.Input.ReadAsync(connection.ConnectionAborted); var buffer = result.Buffer; - var consumed = buffer.End; - var examined = buffer.End; try { if (!buffer.IsEmpty) { - var hubMessages = new List(); - - // TODO: Make this incremental - if (connection.Protocol.TryParseMessages(buffer.ToArray(), _dispatcher, hubMessages)) + while (connection.Protocol.TryParseMessage(ref buffer, _dispatcher, out var message)) { - foreach (var hubMessage in hubMessages) - { - // Don't wait on the result of execution, continue processing other - // incoming messages on this connection. - _ = _dispatcher.DispatchMessageAsync(connection, hubMessage); - } + // Don't wait on the result of execution, continue processing other + // incoming messages on this connection. + _ = _dispatcher.DispatchMessageAsync(connection, message); } } else if (result.IsCompleted) @@ -191,7 +183,10 @@ namespace Microsoft.AspNetCore.SignalR } finally { - connection.Input.AdvanceTo(consumed, examined); + // The buffer was sliced up to where it was consumed, so we can just advance to the start. + // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data + // before yielding the read again. + connection.Input.AdvanceTo(buffer.Start, buffer.End); } } } diff --git a/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs index 59d4a93ec7..2b9f130e08 100644 --- a/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs +++ b/src/Microsoft.AspNetCore.SignalR.Protocols.MsgPack/Internal/Protocol/MessagePackHubProtocol.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -46,21 +47,33 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol return version == Version; } - public bool TryParseMessages(ReadOnlyMemory input, IInvocationBinder binder, IList messages) + public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage message) { - while (BinaryMessageParser.TryParseMessage(ref input, out var payload)) + if (!BinaryMessageParser.TryParseMessage(ref input, out var payload)) { - var isArray = MemoryMarshal.TryGetArray(payload, out var arraySegment); - // This will never be false unless we started using un-managed buffers - Debug.Assert(isArray); - var message = ParseMessage(arraySegment.Array, arraySegment.Offset, binder); - if (message != null) - { - messages.Add(message); - } + message = null; + return false; } - return messages.Count > 0; + var arraySegment = GetArraySegment(payload); + + message = ParseMessage(arraySegment.Array, arraySegment.Offset, binder); + + return message != null; + } + + private static ArraySegment GetArraySegment(ReadOnlySequence input) + { + if (input.IsSingleSegment) + { + var isArray = MemoryMarshal.TryGetArray(input.First, out var arraySegment); + // This will never be false unless we started using un-managed buffers + Debug.Assert(isArray); + return arraySegment; + } + + // Should be rare + return new ArraySegment(input.ToArray()); } private static HubMessage ParseMessage(byte[] input, int startOffset, IInvocationBinder binder) diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs index d043e00c1b..1c7f7ed669 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Threading.Tasks; @@ -144,7 +145,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests return true; } - public bool TryParseMessages(ReadOnlyMemory input, IInvocationBinder binder, IList messages) + public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage message) { if (_error != null) { @@ -152,7 +153,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests } if (_parsed != null) { - messages.Add(_parsed); + message = _parsed; return true; } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs index 3186bfa045..9d55c0a186 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -113,7 +114,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters { BinaryMessageFormatter.WriteLengthPrefix(payload.Length, ms); ms.Write(payload, 0, payload.Length); - var buffer = new ReadOnlyMemory(ms.ToArray()); + var buffer = new ReadOnlySequence(ms.ToArray()); Assert.True(BinaryMessageParser.TryParseMessage(ref buffer, out var roundtripped)); Assert.Equal(payload, roundtripped.ToArray()); } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs index 385be0d6e0..ad7139a331 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.Text; using Microsoft.AspNetCore.SignalR.Internal.Formatters; @@ -17,7 +18,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters [InlineData(new byte[] { 0x0B, 0x41, 0x0A, 0x52, 0x0D, 0x43, 0x0D, 0x0A, 0x3B, 0x44, 0x45, 0x46 }, "A\nR\rC\r\n;DEF")] public void ReadMessage(byte[] encoded, string payload) { - ReadOnlyMemory span = encoded; + var span = new ReadOnlySequence(encoded); Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message)); Assert.Equal(0, span.Length); @@ -52,7 +53,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters })] public void ReadBinaryMessage(byte[] encoded, byte[] payload) { - ReadOnlyMemory< byte> span = encoded; + var span = new ReadOnlySequence(encoded); Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message)); Assert.Equal(0, span.Length); Assert.Equal(payload, message.ToArray()); @@ -66,7 +67,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters { var ex = Assert.Throws(() => { - var buffer = new ReadOnlyMemory(payload); + var buffer = new ReadOnlySequence(payload);; BinaryMessageParser.TryParseMessage(ref buffer, out var message); }); Assert.Equal("Messages over 2GB in size are not supported.", ex.Message); @@ -79,7 +80,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters [InlineData(new byte[] { 0x80 })] // size is cut public void BinaryMessageParserReturnsFalseForPartialPayloads(byte[] payload) { - var buffer = new ReadOnlyMemory(payload); + var buffer = new ReadOnlySequence(payload); Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message)); } @@ -94,7 +95,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters /* body: */ 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x2C, 0x0D, 0x0A, 0x57, 0x6F, 0x72, 0x6C, 0x64, 0x21, }; - ReadOnlyMemory buffer = encoded; + var buffer = new ReadOnlySequence(encoded); var messages = new List(); while (BinaryMessageParser.TryParseMessage(ref buffer, out var message)) { @@ -113,7 +114,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters [InlineData(new byte[] { 0x09, 0x00, 0x00 })] // Not enough data for payload public void ReadIncompleteMessages(byte[] encoded) { - ReadOnlyMemory buffer = encoded; + var buffer = new ReadOnlySequence(encoded); Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message)); Assert.Equal(encoded.Length, buffer.Length); } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs index 9dbc7b2866..f7a337e174 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Text; using Microsoft.AspNetCore.SignalR.Internal.Formatters; using Xunit; @@ -13,7 +14,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters [Fact] public void ReadMessage() { - var message = new ReadOnlyMemory(Encoding.UTF8.GetBytes("ABC\u001e")); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes("ABC\u001e")); Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload)); Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray())); @@ -23,14 +24,14 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters [Fact] public void TryReadingIncompleteMessage() { - var message = new ReadOnlyMemory(Encoding.UTF8.GetBytes("ABC")); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes("ABC")); Assert.False(TextMessageParser.TryParseMessage(ref message, out var payload)); } [Fact] public void TryReadingMultipleMessages() { - var message = new ReadOnlyMemory(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e")); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e")); Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload)); Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray())); Assert.True(TextMessageParser.TryParseMessage(ref message, out payload)); @@ -40,7 +41,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters [Fact] public void IncompleteTrailingMessage() { - var message = new ReadOnlyMemory(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123")); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123")); Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload)); Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray())); Assert.True(TextMessageParser.TryParseMessage(ref message, out payload)); diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/HandshakeProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/HandshakeProtocolTests.cs index ab05298a79..dcb35d5839 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/HandshakeProtocolTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/HandshakeProtocolTests.cs @@ -18,9 +18,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [InlineData("{\"protocol\":null,\"version\":123}\u001e", null, 123)] public void ParsingHandshakeRequestMessageSuccessForValidMessages(string json, string protocol, int version) { - var message = Encoding.UTF8.GetBytes(json); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes(json)); - Assert.True(HandshakeProtocol.TryParseRequestMessage(new ReadOnlySequence(message), out var deserializedMessage, out _, out _)); + Assert.True(HandshakeProtocol.TryParseRequestMessage(ref message, out var deserializedMessage)); Assert.Equal(protocol, deserializedMessage.Protocol); Assert.Equal(version, deserializedMessage.Version); @@ -33,19 +33,18 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [InlineData("{}\u001e", null)] public void ParsingHandshakeResponseMessageSuccessForValidMessages(string json, string error) { - var message = Encoding.UTF8.GetBytes(json); - - var response = HandshakeProtocol.ParseResponseMessage(message); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes(json)); + Assert.True(HandshakeProtocol.TryParseResponseMessage(ref message, out var response)); Assert.Equal(error, response.Error); } [Fact] public void ParsingHandshakeRequestNotCompleteReturnsFalse() { - var message = Encoding.UTF8.GetBytes("42"); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes("42")); - Assert.False(HandshakeProtocol.TryParseRequestMessage(new ReadOnlySequence(message), out _, out _, out _)); + Assert.False(HandshakeProtocol.TryParseRequestMessage(ref message, out _)); } [Theory] @@ -59,25 +58,25 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [InlineData("{\"protocol\":null,\"version\":\"123\"}\u001e", "Expected 'version' to be of type Integer.")] public void ParsingHandshakeRequestMessageThrowsForInvalidMessages(string payload, string expectedMessage) { - var message = Encoding.UTF8.GetBytes(payload); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes(payload)); var exception = Assert.Throws(() => - Assert.True(HandshakeProtocol.TryParseRequestMessage(new ReadOnlySequence(message), out _, out _, out _))); + Assert.True(HandshakeProtocol.TryParseRequestMessage(ref message, out _))); Assert.Equal(expectedMessage, exception.Message); } [Theory] - [InlineData("42", "Unexpected JSON Token Type 'Integer'. Expected a JSON Object.")] - [InlineData("\"42\"", "Unexpected JSON Token Type 'String'. Expected a JSON Object.")] - [InlineData("null", "Unexpected JSON Token Type 'Null'. Expected a JSON Object.")] - [InlineData("[]", "Unexpected JSON Token Type 'Array'. Expected a JSON Object.")] + [InlineData("42\u001e", "Unexpected JSON Token Type 'Integer'. Expected a JSON Object.")] + [InlineData("\"42\"\u001e", "Unexpected JSON Token Type 'String'. Expected a JSON Object.")] + [InlineData("null\u001e", "Unexpected JSON Token Type 'Null'. Expected a JSON Object.")] + [InlineData("[]\u001e", "Unexpected JSON Token Type 'Array'. Expected a JSON Object.")] public void ParsingHandshakeResponseMessageThrowsForInvalidMessages(string payload, string expectedMessage) { - var message = Encoding.UTF8.GetBytes(payload); + var message = new ReadOnlySequence(Encoding.UTF8.GetBytes(payload)); var exception = Assert.Throws(() => - HandshakeProtocol.ParseResponseMessage(message)); + HandshakeProtocol.TryParseRequestMessage(ref message, out _)); Assert.Equal(expectedMessage, exception.Message); } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs index 1e63795e50..71ceb25eec 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/JsonHubProtocolTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Text; @@ -133,10 +134,10 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol var binder = new TestBinder(expectedMessage); var protocol = new JsonHubProtocol(Options.Create(protocolOptions)); - var messages = new List(); - protocol.TryParseMessages(Encoding.UTF8.GetBytes(input), binder, messages); + var data = new ReadOnlySequence(Encoding.UTF8.GetBytes(input)); + protocol.TryParseMessage(ref data, binder, out var message); - Assert.Equal(expectedMessage, messages[0], TestHubMessageEqualityComparer.Instance); + Assert.Equal(expectedMessage, message, TestHubMessageEqualityComparer.Instance); } [Theory] @@ -183,8 +184,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol var binder = new TestBinder(Array.Empty(), typeof(object)); var protocol = new JsonHubProtocol(); - var messages = new List(); - var ex = Assert.Throws(() => protocol.TryParseMessages(Encoding.UTF8.GetBytes(input), binder, messages)); + var data = new ReadOnlySequence(Encoding.UTF8.GetBytes(input)); + var ex = Assert.Throws(() => protocol.TryParseMessage(ref data, binder, out var _)); Assert.Equal(expectedMessage, ex.Message); } @@ -196,10 +197,10 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol var binder = new TestBinder(expectedMessage); var protocol = new JsonHubProtocol(); - var messages = new List(); - protocol.TryParseMessages(Encoding.UTF8.GetBytes(input), binder, messages); + var data = new ReadOnlySequence(Encoding.UTF8.GetBytes(input)); + protocol.TryParseMessage(ref data, binder, out var message); - Assert.Equal(expectedMessage, messages[0], TestHubMessageEqualityComparer.Instance); + Assert.Equal(expectedMessage, message, TestHubMessageEqualityComparer.Instance); } [Theory] @@ -210,9 +211,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol var binder = new TestBinder(paramTypes: new[] { typeof(int), typeof(string) }, returnType: typeof(bool)); var protocol = new JsonHubProtocol(); - var messages = new List(); - Assert.True(protocol.TryParseMessages(Encoding.UTF8.GetBytes(input), binder, messages)); - Assert.Single(messages); + var data = new ReadOnlySequence(Encoding.UTF8.GetBytes(input)); + Assert.True(protocol.TryParseMessage(ref data, binder, out var message)); + Assert.NotNull(message); } [Theory] @@ -228,9 +229,9 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol var binder = new TestBinder(paramTypes: new[] { typeof(int), typeof(string) }, returnType: typeof(bool)); var protocol = new JsonHubProtocol(); - var messages = new List(); - protocol.TryParseMessages(Encoding.UTF8.GetBytes(input), binder, messages); - var ex = Assert.Throws(() => ((HubMethodInvocationMessage)messages[0]).Arguments); + var data = new ReadOnlySequence(Encoding.UTF8.GetBytes(input)); + protocol.TryParseMessage(ref data, binder, out var message); + var ex = Assert.Throws(() => ((HubMethodInvocationMessage)message).Arguments); Assert.Equal(expectedMessage, ex.Message); } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs index 5253257b81..b1fccbcc2a 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/MessagePackHubProtocolTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -286,11 +287,11 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol // Parse the input fully now. bytes = Frame(bytes); var protocol = new MessagePackHubProtocol(); - var messages = new List(); - Assert.True(protocol.TryParseMessages(bytes, new TestBinder(testData.Message), messages)); + var data = new ReadOnlySequence(bytes); + Assert.True(protocol.TryParseMessage(ref data, new TestBinder(testData.Message), out var message)); - Assert.Single(messages); - Assert.Equal(testData.Message, messages[0], TestHubMessageEqualityComparer.Instance); + Assert.NotNull(message); + Assert.Equal(testData.Message, message, TestHubMessageEqualityComparer.Instance); } [Fact] @@ -308,11 +309,11 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol // Parse the input fully now. bytes = Frame(bytes); var protocol = new MessagePackHubProtocol(); - var messages = new List(); - Assert.True(protocol.TryParseMessages(bytes, new TestBinder(expectedMessage), messages)); + var data = new ReadOnlySequence(bytes); + Assert.True(protocol.TryParseMessage(ref data, new TestBinder(expectedMessage), out var message)); - Assert.Single(messages); - Assert.Equal(expectedMessage, messages[0], TestHubMessageEqualityComparer.Instance); + Assert.NotNull(message); + Assert.Equal(expectedMessage, message, TestHubMessageEqualityComparer.Instance); } [Theory] @@ -325,7 +326,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol AssertMessages(testData.Encoded, bytes); // Unframe the message to check the binary encoding - ReadOnlyMemory byteSpan = bytes; + var byteSpan = new ReadOnlySequence(bytes); Assert.True(BinaryMessageParser.TryParseMessage(ref byteSpan, out var unframed)); // Check the baseline binary encoding, use Assert.True in order to configure the error message @@ -380,8 +381,8 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol { var buffer = Frame(Pack(testData.Encoded)); var binder = new TestBinder(new[] { typeof(string) }, typeof(string)); - var messages = new List(); - var exception = Assert.Throws(() => _hubProtocol.TryParseMessages(buffer, binder, messages)); + var data = new ReadOnlySequence(buffer); + var exception = Assert.Throws(() => _hubProtocol.TryParseMessage(ref data, binder, out _)); Assert.Equal(testData.ErrorMessage, exception.Message); } @@ -409,22 +410,21 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol { var buffer = Frame(Pack(testData.Encoded)); var binder = new TestBinder(new[] { typeof(string) }, typeof(string)); - var messages = new List(); - _hubProtocol.TryParseMessages(buffer, binder, messages); - var exception = Assert.Throws(() => ((HubMethodInvocationMessage)messages[0]).Arguments); + var data = new ReadOnlySequence(buffer); + _hubProtocol.TryParseMessage(ref data, binder, out var message); + var exception = Assert.Throws(() => ((HubMethodInvocationMessage)message).Arguments); Assert.Equal(testData.ErrorMessage, exception.Message); } [Theory] - [InlineData(new object[] { new byte[] { 0x05, 0x01 }, 0 })] - public void ParserDoesNotConsumePartialData(byte[] payload, int expectedMessagesCount) + [InlineData(new byte[] { 0x05, 0x01 })] + public void ParserDoesNotConsumePartialData(byte[] payload) { var binder = new TestBinder(new[] { typeof(string) }, typeof(string)); - var messages = new List(); - var result = _hubProtocol.TryParseMessages(payload, binder, messages); - Assert.True(result || messages.Count == 0); - Assert.Equal(expectedMessagesCount, messages.Count); + var data = new ReadOnlySequence(payload); + var result = _hubProtocol.TryParseMessage(ref data, binder, out var message); + Assert.Null(message); } [Fact] @@ -434,9 +434,10 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol AssertMessages(Array(HubProtocolConstants.CompletionMessageType, Map(), "0", 3, Array(42)), result); } - private static void AssertMessages(MessagePackObject expectedOutput, ReadOnlyMemory bytes) + private static void AssertMessages(MessagePackObject expectedOutput, byte[] bytes) { - Assert.True(BinaryMessageParser.TryParseMessage(ref bytes, out var message)); + var data = new ReadOnlySequence(bytes); + Assert.True(BinaryMessageParser.TryParseMessage(ref data, out var message)); var obj = Unpack(message.ToArray()); Assert.Equal(expectedOutput, obj); } diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/Utf8BufferTextReaderTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/Utf8BufferTextReaderTests.cs index 08f921b104..511240e6cc 100644 --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/Utf8BufferTextReaderTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Protocol/Utf8BufferTextReaderTests.cs @@ -15,7 +15,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [Fact] public void ReadingWhenCharBufferBigEnough() { - var buffer = Encoding.UTF8.GetBytes("Hello World"); + var buffer = new ReadOnlySequence(Encoding.UTF8.GetBytes("Hello World")); var reader = new Utf8BufferTextReader(); reader.SetBuffer(buffer); @@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [Fact] public void ReadingUnicodeWhenCharBufferBigEnough() { - var buffer = Encoding.UTF8.GetBytes("a\u00E4\u00E4\u00a9o"); + var buffer = new ReadOnlySequence(Encoding.UTF8.GetBytes("a\u00E4\u00E4\u00a9o")); var reader = new Utf8BufferTextReader(); reader.SetBuffer(buffer); @@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [Fact] public void ReadingWhenCharBufferBigEnoughAndNotStartingFromZero() { - var buffer = Encoding.UTF8.GetBytes("Hello World"); + var buffer = new ReadOnlySequence(Encoding.UTF8.GetBytes("Hello World")); var reader = new Utf8BufferTextReader(); reader.SetBuffer(buffer); @@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [Fact] public void ReadingWhenBufferTooSmall() { - var buffer = Encoding.UTF8.GetBytes("Hello World"); + var buffer = new ReadOnlySequence(Encoding.UTF8.GetBytes("Hello World")); var reader = new Utf8BufferTextReader(); reader.SetBuffer(buffer); @@ -92,7 +92,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol [Fact] public void ReadingUnicodeWhenBufferTooSmall() { - var buffer = Encoding.UTF8.GetBytes("\u00E4\u00E4\u00E5"); + var buffer = new ReadOnlySequence(Encoding.UTF8.GetBytes("\u00E4\u00E4\u00E5")); var reader = new Utf8BufferTextReader(); reader.SetBuffer(buffer); diff --git a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/TestClient.cs b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/TestClient.cs index 3a746ce965..580eb44f81 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/TestClient.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/TestClient.cs @@ -27,7 +27,6 @@ namespace Microsoft.AspNetCore.SignalR.Tests private readonly IHubProtocol _protocol; private readonly IInvocationBinder _invocationBinder; private readonly CancellationTokenSource _cts; - private readonly Queue _messages = new Queue(); public DefaultConnectionContext Connection { get; } public Task Connected => ((TaskCompletionSource)Connection.Items["ConnectedTask"]).Task; @@ -84,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests // note that the handshake response might not immediately be readable // e.g. server is waiting for request, times out after configured duration, // and sends response with timeout error - HandshakeResponseMessage = (HandshakeResponseMessage) await ReadAsync(true).OrTimeout(); + HandshakeResponseMessage = (HandshakeResponseMessage)await ReadAsync(true).OrTimeout(); } return connection; @@ -220,51 +219,35 @@ namespace Microsoft.AspNetCore.SignalR.Tests public HubMessage TryRead(bool isHandshake = false) { - if (_messages.Count > 0) - { - return _messages.Dequeue(); - } - if (!Connection.Application.Input.TryRead(out var result)) { return null; } var buffer = result.Buffer; - var consumed = buffer.End; - var examined = consumed; try { if (!isHandshake) { - var messages = new List(); - if (_protocol.TryParseMessages(result.Buffer.ToArray(), _invocationBinder, messages)) + if (_protocol.TryParseMessage(ref buffer, _invocationBinder, out var message)) { - foreach (var m in messages) - { - _messages.Enqueue(m); - } - - return _messages.Dequeue(); + return message; } } else { - HandshakeProtocol.TryReadMessageIntoSingleMemory(buffer, out consumed, out examined, out var data); - - // read first message out of the incoming data - if (!TextMessageParser.TryParseMessage(ref data, out var payload)) + // read first message out of the incoming data + if (!HandshakeProtocol.TryParseResponseMessage(ref buffer, out var responseMessage)) { throw new InvalidDataException("Unable to parse payload as a handshake response message."); } - - return HandshakeProtocol.ParseResponseMessage(payload); + return responseMessage; } } finally { - Connection.Application.Input.AdvanceTo(consumed, examined); + Connection.Application.Input.AdvanceTo(buffer.Start); } return null;