Make parsers stateless (#628)

* Make parsers stateless
- Changed parser input to ReadOnlySpan<byte> to ReadOnlyBuffer<byte>
This commit is contained in:
David Fowler 2017-07-01 23:44:22 -07:00 committed by GitHub
parent 10b195bcce
commit 0f4295f90e
11 changed files with 112 additions and 220 deletions

View File

@ -6,69 +6,41 @@ using System.Binary;
namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
{
public class BinaryMessageParser
public static class BinaryMessageParser
{
private ParserState _state;
public void Reset()
public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
{
_state = default(ParserState);
}
public bool TryParseMessage(ref ReadOnlySpan<byte> buffer, out ReadOnlyBuffer<byte> payload)
{
if (_state.Length == null)
{
long length = 0;
if (buffer.Length < sizeof(long))
{
payload = default(ReadOnlyBuffer<byte>);
return false;
}
length = buffer.Slice(0, sizeof(long)).ReadBigEndian<long>();
if (length > Int32.MaxValue)
{
throw new FormatException("Messages over 2GB in size are not supported");
}
buffer = buffer.Slice(sizeof(long));
_state.Length = (int)length;
}
if (_state.Payload == null)
{
_state.Payload = new byte[_state.Length.Value];
}
while (_state.Read < _state.Payload.Length && buffer.Length > 0)
{
// Copy what we can from the current unread segment
var toCopy = Math.Min(_state.Payload.Length - _state.Read, buffer.Length);
buffer.Slice(0, toCopy).CopyTo(new Span<byte>(_state.Payload, _state.Read));
_state.Read += toCopy;
buffer = buffer.Slice(toCopy);
}
if (_state.Read == _state.Payload.Length)
{
payload = _state.Payload;
Reset();
return true;
}
// There's still more to read.
long length = 0;
payload = default(ReadOnlyBuffer<byte>);
return false;
}
private struct ParserState
{
public int? Length;
public byte[] Payload;
public int Read;
if (buffer.Length < sizeof(long))
{
return false;
}
// Read the length
length = buffer.Span.Slice(0, sizeof(long)).ReadBigEndian<long>();
if (length > Int32.MaxValue)
{
throw new FormatException("Messages over 2GB in size are not supported");
}
// Skip over the length
var remaining = buffer.Slice(sizeof(long));
// We don't have enough data
while (remaining.Length < (int)length)
{
return false;
}
// Get the payload
payload = remaining.Slice(0, (int)length);
// Skip the payload
buffer = remaining.Slice((int)length);
return true;
}
}
}

View File

@ -6,92 +6,77 @@ using System.Text;
namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
{
public class TextMessageParser
public static class TextMessageParser
{
private const int Int32OverflowLength = 10;
private ParserState _state;
public void Reset()
{
_state = default(ParserState);
}
/// <summary>
/// Attempts to parse a message from the buffer. Returns 'false' if there is not enough data to complete a message. Throws an
/// exception if there is a format error in the provided data.
/// </summary>
public bool TryParseMessage(ref ReadOnlySpan<byte> buffer, out ReadOnlyBuffer<byte> payload)
public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
{
while (buffer.Length > 0)
payload = default(ReadOnlyBuffer<byte>);
var span = buffer.Span;
if (!TryReadLength(span, out var index, out var length))
{
switch (_state.Phase)
{
case ParsePhase.ReadingLength:
if (!TryReadLength(ref buffer))
{
payload = default(ReadOnlyBuffer<byte>);
return false;
}
break;
case ParsePhase.LengthComplete:
if (!TryReadDelimiter(ref buffer, TextMessageFormatter.FieldDelimiter, ParsePhase.ReadingPayload, "length"))
{
payload = default(ReadOnlyBuffer<byte>);
return false;
}
break;
case ParsePhase.ReadingPayload:
ReadPayload(ref buffer);
break;
case ParsePhase.PayloadComplete:
if (!TryReadDelimiter(ref buffer, TextMessageFormatter.MessageDelimiter, ParsePhase.ReadingPayload, "payload"))
{
payload = default(ReadOnlyBuffer<byte>);
return false;
}
// We're done!
payload = _state.Payload;
Reset();
return true;
default:
throw new InvalidOperationException($"Invalid parser phase: {_state.Phase}");
}
return false;
}
payload = default(ReadOnlyBuffer<byte>);
return false;
var remaining = buffer.Slice(index);
span = remaining.Span;
if (!TryReadDelimiter(span, TextMessageFormatter.FieldDelimiter, "length"))
{
return false;
}
// Skip the delimeter
remaining = remaining.Slice(1);
if (remaining.Length < length + 1)
{
return false;
}
payload = remaining.Slice(0, length);
remaining = remaining.Slice(length);
if (!TryReadDelimiter(remaining.Span, TextMessageFormatter.MessageDelimiter, "payload"))
{
return false;
}
// Skip the delimeter
buffer = remaining.Slice(1);
return true;
}
private bool TryReadLength(ref ReadOnlySpan<byte> buffer)
private static bool TryReadLength(ReadOnlySpan<byte> buffer, out int index, out int length)
{
length = 0;
// Read until the first ':' to find the length
var found = buffer.IndexOf((byte)TextMessageFormatter.FieldDelimiter);
index = buffer.IndexOf((byte)TextMessageFormatter.FieldDelimiter);
if (found == -1)
if (index == -1)
{
// Insufficient data
return false;
}
var lengthSpan = buffer.Slice(0, found);
var lengthSpan = buffer.Slice(0, index);
if (!TryParseInt32(lengthSpan, out var length, out var bytesConsumed) || bytesConsumed < lengthSpan.Length)
if (!TryParseInt32(lengthSpan, out length, out var bytesConsumed) || bytesConsumed < lengthSpan.Length)
{
throw new FormatException($"Invalid length: '{Encoding.UTF8.GetString(lengthSpan.ToArray())}'");
}
buffer = buffer.Slice(found);
_state.Length = length;
_state.Phase = ParsePhase.LengthComplete;
return true;
}
private bool TryReadDelimiter(ref ReadOnlySpan<byte> buffer, char delimiter, ParsePhase nextPhase, string field)
private static bool TryReadDelimiter(ReadOnlySpan<byte> buffer, char delimiter, string field)
{
if (buffer.Length == 0)
{
@ -103,35 +88,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
throw new FormatException($"Missing delimiter '{delimiter}' after {field}");
}
buffer = buffer.Slice(1);
_state.Phase = nextPhase;
return true;
}
private void ReadPayload(ref ReadOnlySpan<byte> buffer)
{
if (_state.Payload == null)
{
_state.Payload = new byte[_state.Length];
}
if (_state.Read == _state.Length)
{
_state.Phase = ParsePhase.PayloadComplete;
}
else
{
// Copy as much as possible from the Unread buffer
var toCopy = Math.Min(_state.Length - _state.Read, buffer.Length);
buffer.Slice(0, toCopy).CopyTo(new Span<byte>(_state.Payload, _state.Read));
_state.Read += toCopy;
buffer = buffer.Slice(toCopy);
}
}
public static bool TryParseInt32(ReadOnlySpan<byte> text, out int value, out int bytesConsumed)
private static bool TryParseInt32(ReadOnlySpan<byte> text, out int value, out int bytesConsumed)
{
if (text.Length < 1)
{
@ -221,21 +181,5 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
value = parsedValue * sign;
return true;
}
private struct ParserState
{
public ParsePhase Phase;
public int Length;
public byte[] Payload;
public int Read;
}
private enum ParsePhase
{
ReadingLength = 0,
LengthComplete,
ReadingPayload,
PayloadComplete
}
}
}

View File

@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
string Name { get; }
bool TryParseMessages(ReadOnlySpan<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
void WriteMessage(HubMessage message, Stream output);
}

View File

@ -44,14 +44,13 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
_payloadSerializer = payloadSerializer;
}
public string Name { get => "json"; }
public string Name => "json";
public bool TryParseMessages(ReadOnlySpan<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();
var parser = new TextMessageParser();
while (parser.TryParseMessage(ref input, out var payload))
while (TextMessageParser.TryParseMessage(ref input, out var payload))
{
// TODO: Need a span-native JSON parser!
using (var memoryStream = new MemoryStream(payload.ToArray()))

View File

@ -16,15 +16,13 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
private const int StreamItemMessageType = 2;
private const int CompletionMessageType = 3;
public string Name { get => "messagepack"; }
public string Name => "messagepack";
public bool TryParseMessages(ReadOnlySpan<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();
var messageParser = new BinaryMessageParser();
while (messageParser.TryParseMessage(ref input, out var payload))
while (BinaryMessageParser.TryParseMessage(ref input, out var payload))
{
using (var memoryStream = new MemoryStream(payload.ToArray()))
{

View File

@ -29,10 +29,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
}
}
public static bool TryParseMessage(ReadOnlySpan<byte> input, out NegotiationMessage negotiationMessage)
public static bool TryParseMessage(ReadOnlyBuffer<byte> input, out NegotiationMessage negotiationMessage)
{
var parser = new TextMessageParser();
if (!parser.TryParseMessage(ref input, out var payload))
if (!TextMessageParser.TryParseMessage(ref input, out var payload))
{
throw new FormatException("Unable to parse payload as a negotiation message.");
}

View File

@ -183,7 +183,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
public string Name { get => "MockHubProtocol"; }
public bool TryParseMessages(ReadOnlySpan<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
{
messages = new List<HubMessage>();

View File

@ -65,8 +65,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
_hubProtocol.WriteMessage(hubMessage, memoryStream);
}
_hubProtocol.TryParseMessages(
new ReadOnlySpan<byte>(memoryStream.ToArray()), new CompositeTestBinder(hubMessages), out var messages);
_hubProtocol.TryParseMessages(memoryStream.ToArray(), new CompositeTestBinder(hubMessages), out var messages);
Assert.Equal(hubMessages, messages, TestHubMessageEqualityComparer.Instance);
}
@ -123,8 +122,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Protocol
Array.Copy(payload, 0, buffer, 8, payloadSize);
var binder = new TestBinder(new[] { typeof(string) }, typeof(string));
var exception = Assert.Throws<FormatException>(() =>
_hubProtocol.TryParseMessages(new ReadOnlySpan<byte>(buffer), binder, out var messages));
var exception = Assert.Throws<FormatException>(() => _hubProtocol.TryParseMessages(buffer, binder, out var messages));
Assert.Equal(expectedExceptionMessage, exception.Message);
}

View File

@ -1,5 +1,4 @@
using System;
using System.Buffers;
using System.IO;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Sockets.Internal.Formatters;
@ -10,8 +9,6 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
public class MessageParserBenchmark
{
private static readonly Random Random = new Random();
private readonly TextMessageParser _textMessageParser = new TextMessageParser();
private readonly BinaryMessageParser _binaryMessageParser = new BinaryMessageParser();
private ReadOnlyBuffer<byte> _binaryInput;
private ReadOnlyBuffer<byte> _textInput;
@ -42,8 +39,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Benchmark]
public void SingleBinaryMessage()
{
var buffer = _binaryInput.Span;
if (!_binaryMessageParser.TryParseMessage(ref buffer, out _))
var buffer = _binaryInput;
if (!BinaryMessageParser.TryParseMessage(ref buffer, out _))
{
throw new InvalidOperationException("Failed to parse");
}
@ -52,8 +49,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
[Benchmark]
public void SingleTextMessage()
{
var buffer = _textInput.Span;
if (!_textMessageParser.TryParseMessage(ref buffer, out _))
var buffer = _textInput;
if (!TextMessageParser.TryParseMessage(ref buffer, out _))
{
throw new InvalidOperationException("Failed to parse");
}

View File

@ -18,9 +18,8 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0B, 0x41, 0x0A, 0x52, 0x0D, 0x43, 0x0D, 0x0A, 0x3B, 0x44, 0x45, 0x46 }, "A\nR\rC\r\n;DEF")]
public void ReadMessage(byte[] encoded, string payload)
{
var parser = new BinaryMessageParser();
ReadOnlySpan<byte> span = encoded.AsSpan();
Assert.True(parser.TryParseMessage(ref span, out var message));
ReadOnlyBuffer<byte> span = encoded;
Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
Assert.Equal(0, span.Length);
Assert.Equal(Encoding.UTF8.GetBytes(payload), message.ToArray());
@ -31,9 +30,8 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xAB, 0xCD, 0xEF, 0x12 }, new byte[] { 0xAB, 0xCD, 0xEF, 0x12 })]
public void ReadBinaryMessage(byte[] encoded, byte[] payload)
{
var parser = new BinaryMessageParser();
ReadOnlySpan<byte> span = encoded.AsSpan();
Assert.True(parser.TryParseMessage(ref span, out var message));
ReadOnlyBuffer<byte> span = encoded;
Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
Assert.Equal(0, span.Length);
Assert.Equal(payload, message.ToArray());
}
@ -48,16 +46,15 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
/* length: */ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0E,
/* body: */ 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x2C, 0x0D, 0x0A, 0x57, 0x6F, 0x72, 0x6C, 0x64, 0x21,
};
var parser = new BinaryMessageParser();
ReadOnlySpan<byte> span = encoded.AsSpan();
ReadOnlyBuffer<byte> buffer = encoded;
var messages = new List<byte[]>();
while (parser.TryParseMessage(ref span, out var message))
while (BinaryMessageParser.TryParseMessage(ref buffer, out var message))
{
messages.Add(message.ToArray());
}
Assert.Equal(0, span.Length);
Assert.Equal(0, buffer.Length);
Assert.Equal(2, messages.Count);
Assert.Equal(new byte[0], messages[0]);
@ -69,10 +66,9 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[InlineData(new byte[] { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00 })] // Not enough data for payload
public void ReadIncompleteMessages(byte[] encoded)
{
var parser = new BinaryMessageParser();
ReadOnlySpan<byte> span = encoded.AsSpan();
Assert.False(parser.TryParseMessage(ref span, out var message));
Assert.Equal(0, span.Length);
ReadOnlyBuffer<byte> buffer = encoded;
Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
Assert.Equal(encoded.Length, buffer.Length);
}
}
}

View File

@ -19,12 +19,10 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[InlineData(4, "12:Hello, World;", "Hello, World")]
public void ReadTextMessage(int chunkSize, string encoded, string payload)
{
var parser = new TextMessageParser();
var buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlySpan<byte> span = buffer.AsSpan();
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
Assert.True(parser.TryParseMessage(ref span, out var message));
Assert.Equal(0, span.Length);
Assert.True(TextMessageParser.TryParseMessage(ref buffer, out var message));
Assert.Equal(0, buffer.Length);
Assert.Equal(Encoding.UTF8.GetBytes(payload), message.ToArray());
}
@ -32,17 +30,15 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
public void ReadMultipleMessages()
{
const string encoded = "0:;14:Hello,\r\nWorld!;";
var parser = new TextMessageParser();
var data = Encoding.UTF8.GetBytes(encoded);
ReadOnlySpan<byte> span = data.AsSpan();
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
var messages = new List<byte[]>();
while (parser.TryParseMessage(ref span, out var message))
while (TextMessageParser.TryParseMessage(ref buffer, out var message))
{
messages.Add(message.ToArray());
}
Assert.Equal(0, span.Length);
Assert.Equal(0, buffer.Length);
Assert.Equal(2, messages.Count);
Assert.Equal(new byte[0], messages[0]);
@ -59,10 +55,8 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[InlineData("5:ABCDE")]
public void ReadIncompleteMessages(string encoded)
{
var parser = new TextMessageParser();
var buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlySpan<byte> span = buffer.AsSpan();
Assert.False(parser.TryParseMessage(ref span, out _));
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
Assert.False(TextMessageParser.TryParseMessage(ref buffer, out _));
}
[Theory]
@ -73,12 +67,10 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[InlineData("5:ABCDEF", "Missing delimiter ';' after payload")]
public void ReadInvalidMessages(string encoded, string expectedMessage)
{
var parser = new TextMessageParser();
var buffer = Encoding.UTF8.GetBytes(encoded);
ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
var ex = Assert.Throws<FormatException>(() =>
{
ReadOnlySpan<byte> span = buffer.AsSpan();
parser.TryParseMessage(ref span, out _);
TextMessageParser.TryParseMessage(ref buffer, out _);
});
Assert.Equal(expectedMessage, ex.Message);
}
@ -86,16 +78,13 @@ namespace Microsoft.AspNetCore.Sockets.Common.Tests.Internal.Formatters
[Fact]
public void ReadInvalidEncodedMessage()
{
var parser = new TextMessageParser();
// Invalid because first character is a UTF-8 "continuation" character
// We need to include the ':' so that
var buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
ReadOnlyBuffer<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
var reader = new BytesReader(buffer);
var ex = Assert.Throws<FormatException>(() =>
{
ReadOnlySpan<byte> span = buffer.AsSpan();
parser.TryParseMessage(ref span, out _);
TextMessageParser.TryParseMessage(ref buffer, out _);
});
Assert.Equal("Invalid length: 'He<48>lo'", ex.Message);
}