HttpRequestStreamReader overrides for Read Span, ReadAsync Memory, ReadLine and ReadLineAsync (#18802)

This commit is contained in:
Alessio Franceschelli 2020-04-13 16:18:12 +01:00 committed by GitHub
parent ce85f28672
commit e3d3da3546
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 458 additions and 59 deletions

View File

@ -0,0 +1,52 @@
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
namespace Microsoft.AspNetCore.WebUtilities
{
public class HttpRequestStreamReaderReadLineBenchmark
{
private MemoryStream _stream;
[Params(200, 1000, 1025, 1600)] // Default buffer length is 1024
public int Length { get; set; }
[GlobalSetup]
public void GlobalSetup()
{
var data = new char[Length];
data[Length - 2] = '\r';
data[Length - 1] = '\n';
_stream = new MemoryStream(Encoding.UTF8.GetBytes(data));
}
[Benchmark]
public async Task<string> ReadLineAsync()
{
var reader = CreateReader();
var result = await reader.ReadLineAsync();
Debug.Assert(result.Length == Length - 2);
return result;
}
[Benchmark]
public string ReadLine()
{
var reader = CreateReader();
var result = reader.ReadLine();
Debug.Assert(result.Length == Length - 2);
return result;
}
[Benchmark]
public HttpRequestStreamReader CreateReader()
{
_stream.Seek(0, SeekOrigin.Begin);
return new HttpRequestStreamReader(_stream, Encoding.UTF8);
}
}
}

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>$(DefaultNetCoreTargetFramework)</TargetFramework>

View File

@ -145,8 +145,13 @@ namespace Microsoft.AspNetCore.WebUtilities
public override int Peek() { throw null; }
public override int Read() { throw null; }
public override int Read(char[] buffer, int index, int count) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public override int Read(System.Span<char> buffer) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(char[] buffer, int index, int count) { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<char> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override string ReadLine() { throw null; }
[System.Diagnostics.DebuggerStepThroughAttribute]
public override System.Threading.Tasks.Task<string> ReadLineAsync() { throw null; }
}
public partial class HttpResponseStreamWriter : System.IO.TextWriter
{

View File

@ -6,6 +6,7 @@ using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.WebUtilities
@ -154,11 +155,23 @@ namespace Microsoft.AspNetCore.WebUtilities
throw new ArgumentOutOfRangeException(nameof(count));
}
var span = new Span<char>(buffer, index, count);
return Read(span);
}
public override int Read(Span<char> buffer)
{
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}
if (_disposed)
{
throw new ObjectDisposedException(nameof(HttpRequestStreamReader));
}
var count = buffer.Length;
var charsRead = 0;
while (count > 0)
{
@ -178,17 +191,16 @@ namespace Microsoft.AspNetCore.WebUtilities
charsRemaining = count;
}
Buffer.BlockCopy(
_charBuffer,
_charBufferIndex * 2,
buffer,
(index + charsRead) * 2,
charsRemaining * 2);
var source = new ReadOnlySpan<char>(_charBuffer, _charBufferIndex, charsRemaining);
source.CopyTo(buffer);
_charBufferIndex += charsRemaining;
charsRead += charsRemaining;
count -= charsRemaining;
buffer = buffer.Slice(charsRemaining, count);
// If we got back fewer chars than we asked for, then it's likely the underlying stream is blocked.
// Send the data back to the caller so they can process it.
if (_isBlocked)
@ -200,7 +212,7 @@ namespace Microsoft.AspNetCore.WebUtilities
return charsRead;
}
public override async Task<int> ReadAsync(char[] buffer, int index, int count)
public override Task<int> ReadAsync(char[] buffer, int index, int count)
{
if (buffer == null)
{
@ -217,6 +229,12 @@ namespace Microsoft.AspNetCore.WebUtilities
throw new ArgumentOutOfRangeException(nameof(count));
}
var memory = new Memory<char>(buffer, index, count);
return ReadAsync(memory).AsTask();
}
public override async ValueTask<int> ReadAsync(Memory<char> buffer, CancellationToken cancellationToken = default)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(HttpRequestStreamReader));
@ -227,14 +245,16 @@ namespace Microsoft.AspNetCore.WebUtilities
return 0;
}
var count = buffer.Length;
var charsRead = 0;
while (count > 0)
{
// n is the characters available in _charBuffer
var n = _charsRead - _charBufferIndex;
var charsRemaining = _charsRead - _charBufferIndex;
// charBuffer is empty, let's read from the stream
if (n == 0)
if (charsRemaining == 0)
{
_charsRead = 0;
_charBufferIndex = 0;
@ -244,7 +264,7 @@ namespace Microsoft.AspNetCore.WebUtilities
// We break out of the loop if the stream is blocked (EOF is reached).
do
{
Debug.Assert(n == 0);
Debug.Assert(charsRemaining == 0);
_bytesRead = await _stream.ReadAsync(
_byteBuffer,
0,
@ -258,45 +278,43 @@ namespace Microsoft.AspNetCore.WebUtilities
// _isBlocked == whether we read fewer bytes than we asked for.
_isBlocked = (_bytesRead < _byteBufferSize);
Debug.Assert(n == 0);
Debug.Assert(charsRemaining == 0);
_charBufferIndex = 0;
n = _decoder.GetChars(
charsRemaining = _decoder.GetChars(
_byteBuffer,
0,
_bytesRead,
_charBuffer,
0);
Debug.Assert(n > 0);
Debug.Assert(charsRemaining > 0);
_charsRead += n; // Number of chars in StreamReader's buffer.
_charsRead += charsRemaining; // Number of chars in StreamReader's buffer.
}
while (n == 0);
while (charsRemaining == 0);
if (n == 0)
if (charsRemaining == 0)
{
break; // We're at EOF
}
}
// Got more chars in charBuffer than the user requested
if (n > count)
if (charsRemaining > count)
{
n = count;
charsRemaining = count;
}
Buffer.BlockCopy(
_charBuffer,
_charBufferIndex * 2,
buffer,
(index + charsRead) * 2,
n * 2);
var source = new Memory<char>(_charBuffer, _charBufferIndex, charsRemaining);
source.CopyTo(buffer);
_charBufferIndex += n;
_charBufferIndex += charsRemaining;
charsRead += n;
count -= n;
charsRead += charsRemaining;
count -= charsRemaining;
buffer = buffer.Slice(charsRemaining, count);
// This function shouldn't block for an indefinite amount of time,
// or reading from a network stream won't work right. If we got
@ -310,6 +328,146 @@ namespace Microsoft.AspNetCore.WebUtilities
return charsRead;
}
public override async Task<string> ReadLineAsync()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(HttpRequestStreamReader));
}
StringBuilder sb = null;
var consumeLineFeed = false;
while (true)
{
if (_charBufferIndex == _charsRead)
{
if (await ReadIntoBufferAsync() == 0)
{
// reached EOF, we need to return null if we were at EOF from the beginning
return sb?.ToString();
}
}
var stepResult = ReadLineStep(ref sb, ref consumeLineFeed);
if (stepResult.Completed)
{
return stepResult.Result ?? sb?.ToString();
}
continue;
}
}
// Reads a line. A line is defined as a sequence of characters followed by
// a carriage return ('\r'), a line feed ('\n'), or a carriage return
// immediately followed by a line feed. The resulting string does not
// contain the terminating carriage return and/or line feed. The returned
// value is null if the end of the input stream has been reached.
public override string ReadLine()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(HttpRequestStreamReader));
}
StringBuilder sb = null;
var consumeLineFeed = false;
while (true)
{
if (_charBufferIndex == _charsRead)
{
if (ReadIntoBuffer() == 0)
{
// reached EOF, we need to return null if we were at EOF from the beginning
return sb?.ToString();
}
}
var stepResult = ReadLineStep(ref sb, ref consumeLineFeed);
if (stepResult.Completed)
{
return stepResult.Result ?? sb?.ToString();
}
}
}
private ReadLineStepResult ReadLineStep(ref StringBuilder sb, ref bool consumeLineFeed)
{
const char carriageReturn = '\r';
const char lineFeed = '\n';
if (consumeLineFeed)
{
if (_charBuffer[_charBufferIndex] == lineFeed)
{
_charBufferIndex++;
}
return ReadLineStepResult.Done;
}
var span = new Span<char>(_charBuffer, _charBufferIndex, _charsRead - _charBufferIndex);
var index = span.IndexOfAny(carriageReturn, lineFeed);
if (index != -1)
{
if (span[index] == carriageReturn)
{
span = span.Slice(0, index);
_charBufferIndex += index + 1;
if (_charBufferIndex < _charsRead)
{
// consume following line feed
if (_charBuffer[_charBufferIndex] == lineFeed)
{
_charBufferIndex++;
}
if (sb != null)
{
sb.Append(span);
return ReadLineStepResult.Done;
}
// perf: if the new line is found in first pass, we skip the StringBuilder
return ReadLineStepResult.FromResult(span.ToString());
}
// we where at the end of buffer, we need to read more to check for a line feed to consume
sb ??= new StringBuilder();
sb.Append(span);
consumeLineFeed = true;
return ReadLineStepResult.Continue;
}
if (span[index] == lineFeed)
{
span = span.Slice(0, index);
_charBufferIndex += index + 1;
if (sb != null)
{
sb.Append(span);
return ReadLineStepResult.Done;
}
// perf: if the new line is found in first pass, we skip the StringBuilder
return ReadLineStepResult.FromResult(span.ToString());
}
}
sb ??= new StringBuilder();
sb.Append(span);
_charBufferIndex = _charsRead;
return ReadLineStepResult.Continue;
}
private int ReadIntoBuffer()
{
_charsRead = 0;
@ -345,7 +503,6 @@ namespace Microsoft.AspNetCore.WebUtilities
do
{
_bytesRead = await _stream.ReadAsync(
_byteBuffer,
0,
@ -370,5 +527,22 @@ namespace Microsoft.AspNetCore.WebUtilities
return _charsRead;
}
private readonly struct ReadLineStepResult
{
public static readonly ReadLineStepResult Done = new ReadLineStepResult(true, null);
public static readonly ReadLineStepResult Continue = new ReadLineStepResult(false, null);
public static ReadLineStepResult FromResult(string value) => new ReadLineStepResult(true, value);
private ReadLineStepResult(bool completed, string result)
{
Completed = completed;
Result = result;
}
public bool Completed { get; }
public string Result { get; }
}
}
}
}

View File

@ -13,7 +13,7 @@ using Xunit;
namespace Microsoft.AspNetCore.WebUtilities
{
public class HttpResponseStreamReaderTest
public class HttpRequestStreamReaderTest
{
private static readonly char[] CharData = new char[]
{
@ -118,7 +118,7 @@ namespace Microsoft.AspNetCore.WebUtilities
}
[Fact]
public static async Task Read_ReadInTwoChunks()
public static async Task ReadAsync_ReadInTwoChunks()
{
// Arrange
var reader = CreateReader();
@ -135,29 +135,31 @@ namespace Microsoft.AspNetCore.WebUtilities
}
}
[Fact]
public static void ReadLine_ReadMultipleLines()
[Theory]
[MemberData(nameof(ReadLineData))]
public static async Task ReadLine_ReadMultipleLines(Func<HttpRequestStreamReader, Task<string>> action)
{
// Arrange
var reader = CreateReader();
var valueString = new string(CharData);
// Act & Assert
var data = reader.ReadLine();
var data = await action(reader);
Assert.Equal(valueString.Substring(0, valueString.IndexOf('\r')), data);
data = reader.ReadLine();
data = await action(reader);
Assert.Equal(valueString.Substring(valueString.IndexOf('\r') + 1, 3), data);
data = reader.ReadLine();
data = await action(reader);
Assert.Equal(valueString.Substring(valueString.IndexOf('\n') + 1, 2), data);
data = reader.ReadLine();
data = await action(reader);
Assert.Equal((valueString.Substring(valueString.LastIndexOf('\n') + 1)), data);
}
[Fact]
public static void ReadLine_ReadWithNoNewlines()
[Theory]
[MemberData(nameof(ReadLineData))]
public static async Task ReadLine_ReadWithNoNewlines(Func<HttpRequestStreamReader, Task<string>> action)
{
// Arrange
var reader = CreateReader();
@ -166,33 +168,165 @@ namespace Microsoft.AspNetCore.WebUtilities
// Act
reader.Read(temp, 0, 1);
var data = reader.ReadLine();
var data = await action(reader);
// Assert
Assert.Equal(valueString.Substring(1, valueString.IndexOf('\r') - 1), data);
}
[Fact]
public static async Task ReadLineAsync_MultipleContinuousLines()
[Theory]
[MemberData(nameof(ReadLineData))]
public static async Task ReadLine_MultipleContinuousLines(Func<HttpRequestStreamReader, Task<string>> action)
{
// Arrange
var stream = new MemoryStream();
var writer = new StreamWriter(stream);
writer.Write("\n\n\r\r\n");
writer.Write("\n\n\r\r\n\r");
writer.Flush();
stream.Position = 0;
var reader = new HttpRequestStreamReader(stream, Encoding.UTF8);
// Act & Assert
for (var i = 0; i < 4; i++)
for (var i = 0; i < 5; i++)
{
var data = await reader.ReadLineAsync();
var data = await action(reader);
Assert.Equal(string.Empty, data);
}
var eol = await reader.ReadLineAsync();
Assert.Null(eol);
var eof = await action(reader);
Assert.Null(eof);
}
[Theory]
[MemberData(nameof(ReadLineData))]
public static async Task ReadLine_CarriageReturnAndLineFeedAcrossBufferBundaries(Func<HttpRequestStreamReader, Task<string>> action)
{
// Arrange
var stream = new MemoryStream();
var writer = new StreamWriter(stream);
writer.Write("123456789\r\nfoo");
writer.Flush();
stream.Position = 0;
var reader = new HttpRequestStreamReader(stream, Encoding.UTF8, 10);
// Act & Assert
var data = await action(reader);
Assert.Equal("123456789", data);
data = await action(reader);
Assert.Equal("foo", data);
var eof = await action(reader);
Assert.Null(eof);
}
[Theory]
[MemberData(nameof(ReadLineData))]
public static async Task ReadLine_EOF(Func<HttpRequestStreamReader, Task<string>> action)
{
// Arrange
var stream = new MemoryStream();
var reader = new HttpRequestStreamReader(stream, Encoding.UTF8);
// Act & Assert
var eof = await action(reader);
Assert.Null(eof);
}
[Theory]
[MemberData(nameof(ReadLineData))]
public static async Task ReadLine_NewLineOnly(Func<HttpRequestStreamReader, Task<string>> action)
{
// Arrange
var stream = new MemoryStream();
var writer = new StreamWriter(stream);
writer.Write("\r\n");
writer.Flush();
stream.Position = 0;
var reader = new HttpRequestStreamReader(stream, Encoding.UTF8);
// Act & Assert
var empty = await action(reader);
Assert.Equal(string.Empty, empty);
}
[Fact]
public static void Read_Span_ReadAllCharactersAtOnce()
{
// Arrange
var reader = CreateReader();
var chars = new char[CharData.Length];
var span = new Span<char>(chars);
// Act
var read = reader.Read(span);
// Assert
Assert.Equal(chars.Length, read);
for (var i = 0; i < CharData.Length; i++)
{
Assert.Equal(CharData[i], chars[i]);
}
}
[Fact]
public static void Read_Span_WithMoreDataThanInternalBufferSize()
{
// Arrange
var reader = CreateReader(10);
var chars = new char[CharData.Length];
var span = new Span<char>(chars);
// Act
var read = reader.Read(span);
// Assert
Assert.Equal(chars.Length, read);
for (var i = 0; i < CharData.Length; i++)
{
Assert.Equal(CharData[i], chars[i]);
}
}
[Fact]
public async static Task ReadAsync_Memory_ReadAllCharactersAtOnce()
{
// Arrange
var reader = CreateReader();
var chars = new char[CharData.Length];
var memory = new Memory<char>(chars);
// Act
var read = await reader.ReadAsync(memory);
// Assert
Assert.Equal(chars.Length, read);
for (var i = 0; i < CharData.Length; i++)
{
Assert.Equal(CharData[i], chars[i]);
}
}
[Fact]
public async static Task ReadAsync_Memory_WithMoreDataThanInternalBufferSize()
{
// Arrange
var reader = CreateReader(10);
var chars = new char[CharData.Length];
var memory = new Memory<char>(chars);
// Act
var read = await reader.ReadAsync(memory);
// Assert
Assert.Equal(chars.Length, read);
for (var i = 0; i < CharData.Length; i++)
{
Assert.Equal(CharData[i], chars[i]);
}
}
[Theory]
@ -205,8 +339,6 @@ namespace Microsoft.AspNetCore.WebUtilities
});
}
[Theory]
[InlineData(0)]
[InlineData(-1)]
@ -242,26 +374,36 @@ namespace Microsoft.AspNetCore.WebUtilities
});
}
[Fact]
public static async Task StreamDisposed_ExpectObjectDisposedExceptionAsync()
[Theory]
[MemberData(nameof(HttpRequestDisposeDataAsync))]
public static async Task StreamDisposed_ExpectObjectDisposedExceptionAsync(Func<HttpRequestStreamReader, Task> action)
{
var httpRequestStreamReader = new HttpRequestStreamReader(new MemoryStream(), Encoding.UTF8, 10, ArrayPool<byte>.Shared, ArrayPool<char>.Shared);
httpRequestStreamReader.Dispose();
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
{
return httpRequestStreamReader.ReadAsync(new char[10], 0, 1);
});
await Assert.ThrowsAsync<ObjectDisposedException>(() => action(httpRequestStreamReader));
}
private static HttpRequestStreamReader CreateReader()
{
MemoryStream stream = CreateStream();
return new HttpRequestStreamReader(stream, Encoding.UTF8);
}
private static HttpRequestStreamReader CreateReader(int bufferSize)
{
MemoryStream stream = CreateStream();
return new HttpRequestStreamReader(stream, Encoding.UTF8, bufferSize);
}
private static MemoryStream CreateStream()
{
var stream = new MemoryStream();
var writer = new StreamWriter(stream);
writer.Write(CharData);
writer.Flush();
stream.Position = 0;
return new HttpRequestStreamReader(stream, Encoding.UTF8);
return stream;
}
private static MemoryStream GetSmallStream()
@ -302,6 +444,10 @@ namespace Microsoft.AspNetCore.WebUtilities
{
var res = httpRequestStreamReader.Read(new char[10], 0, 1);
})};
yield return new object[] { new Action<HttpRequestStreamReader>((httpRequestStreamReader) =>
{
var res = httpRequestStreamReader.Read(new Span<char>(new char[10], 0, 1));
})};
yield return new object[] { new Action<HttpRequestStreamReader>((httpRequestStreamReader) =>
{
@ -309,5 +455,27 @@ namespace Microsoft.AspNetCore.WebUtilities
})};
}
public static IEnumerable<object[]> HttpRequestDisposeDataAsync()
{
yield return new object[] { new Func<HttpRequestStreamReader, Task>(async (httpRequestStreamReader) =>
{
await httpRequestStreamReader.ReadAsync(new char[10], 0, 1);
})};
yield return new object[] { new Func<HttpRequestStreamReader, Task>(async (httpRequestStreamReader) =>
{
await httpRequestStreamReader.ReadAsync(new Memory<char>(new char[10], 0, 1));
})};
}
public static IEnumerable<object[]> ReadLineData()
{
yield return new object[] { new Func<HttpRequestStreamReader, Task<string>>((httpRequestStreamReader) =>
Task.FromResult(httpRequestStreamReader.ReadLine())
)};
yield return new object[] { new Func<HttpRequestStreamReader, Task<string>>((httpRequestStreamReader) =>
httpRequestStreamReader.ReadLineAsync()
)};
}
}
}