Code review feedback for transcoding Streams (#12142)

This commit is contained in:
Pranav K 2019-07-19 16:01:53 -07:00 committed by GitHub
parent d992a1e2ef
commit 533ed4240c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 170 additions and 107 deletions

View File

@ -5,6 +5,7 @@ using System;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Formatters.Json;
@ -65,13 +66,21 @@ namespace Microsoft.AspNetCore.Mvc.Formatters
// the behavior you get when the user does not declare the return type and with Json.Net at least at the top level.
var objectType = context.Object?.GetType() ?? context.ObjectType;
await JsonSerializer.SerializeAsync(writeStream, context.Object, objectType, SerializerOptions);
// The transcoding streams use Encoders and Decoders that have internal buffers. We need to flush these
// when there is no more data to be written. Stream.FlushAsync isn't suitable since it's
// acceptable to Flush a Stream (multiple times) prior to completion.
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcodingStream.FinalWriteAsync(CancellationToken.None);
}
await writeStream.FlushAsync();
}
finally
{
if (writeStream is TranscodingWriteStream transcoding)
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcoding.DisposeAsync();
await transcodingStream.DisposeAsync();
}
}
}

View File

@ -6,6 +6,7 @@ using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;
@ -13,12 +14,12 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
{
internal sealed class TranscodingReadStream : Stream
{
private static readonly int OverflowBufferSize = Encoding.UTF8.GetMaxByteCount(1); // The most number of bytes used to represent a single UTF char
internal const int MaxByteBufferSize = 4096;
internal const int MaxCharBufferSize = 3 * MaxByteBufferSize;
private static readonly int MaxByteCountForUTF8Char = Encoding.UTF8.GetMaxByteCount(charCount: 1);
private readonly Stream _stream;
private readonly Encoder _encoder;
private readonly Decoder _decoder;
private ArraySegment<byte> _byteBuffer;
@ -48,11 +49,10 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
count: 0);
_overflowBuffer = new ArraySegment<byte>(
ArrayPool<byte>.Shared.Rent(MaxByteCountForUTF8Char),
ArrayPool<byte>.Shared.Rent(OverflowBufferSize),
0,
count: 0);
_encoder = Encoding.UTF8.GetEncoder();
_decoder = sourceEncoding.GetDecoder();
}
@ -60,7 +60,12 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get; set; }
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
internal int ByteBufferCount => _byteBuffer.Count;
internal int CharBufferCount => _charBuffer.Count;
@ -76,6 +81,11 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
{
ThrowArgumentOutOfRangeException(buffer, offset, count);
if (count == 0)
{
return 0;
}
var readBuffer = new ArraySegment<byte>(buffer, offset, count);
if (_overflowBuffer.Count > 0)
@ -90,76 +100,55 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
return bytesToCopy;
}
var totalBytes = 0;
bool encoderCompleted;
int bytesEncoded;
do
if (_charBuffer.Count == 0)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
// the segment that follows.
var eof = false;
if (_charBuffer.Count == 0)
{
// Only read more content from the input stream if we have exhausted all the buffered chars.
eof = await ReadInputChars(cancellationToken);
}
// Only read more content from the input stream if we have exhausted all the buffered chars.
await ReadInputChars(cancellationToken);
}
// We need to flush on the last write. This is true when we exhaust the input Stream and any buffered content.
var allContentRead = eof && _charBuffer.Count == 0 && _byteBuffer.Count == 0;
var operationStatus = Utf8.FromUtf16(_charBuffer, readBuffer, out var charsRead, out var bytesWritten, isFinalBlock: false);
_charBuffer = _charBuffer.Slice(charsRead);
if (_charBuffer.Count > 0 && readBuffer.Count < MaxByteCountForUTF8Char && readBuffer.Count < Encoding.UTF8.GetByteCount(_charBuffer.AsSpan(0, 1)))
{
// It's possible that the passed in buffer is smaller than the size required to encode a single
// char. For instance, the JsonSerializer may pass in a buffer of size 1 or 2 which
// is insufficient if the character requires more than 2 bytes to represent. In this case, read
// content in to an overflow buffer and fill up the passed in buffer.
_encoder.Convert(
_charBuffer,
_overflowBuffer.Array,
flush: false,
out var charsUsed,
out var bytesUsed,
out _);
switch (operationStatus)
{
case OperationStatus.Done:
return bytesWritten;
_charBuffer = _charBuffer.Slice(charsUsed);
case OperationStatus.DestinationTooSmall:
if (bytesWritten != 0)
{
return bytesWritten;
}
Debug.Assert(readBuffer.Count < bytesUsed);
// Overflow buffer is always empty when we get here and we can use it's full length to write contents to.
Utf8.FromUtf16(_charBuffer, _overflowBuffer.Array, out var overFlowChars, out var overflowBytes, isFinalBlock: false);
Debug.Assert(overflowBytes > 0 && overFlowChars > 0, "We expect writes to the overflow buffer to always succeed since it is large enough to accomodate at least one char.");
_charBuffer = _charBuffer.Slice(overFlowChars);
// readBuffer: [ 0, 0, ], overflowBuffer: [ 7, 13, 34, ]
// Fill up the readBuffer to capacity, so the result looks like so:
// readBuffer: [ 7, 13 ], overflowBuffer: [ 34 ]
Debug.Assert(readBuffer.Count < overflowBytes);
_overflowBuffer.Array.AsSpan(0, readBuffer.Count).CopyTo(readBuffer);
_overflowBuffer = new ArraySegment<byte>(
_overflowBuffer.Array,
readBuffer.Count,
bytesUsed - readBuffer.Count);
overflowBytes - readBuffer.Count);
totalBytes += readBuffer.Count;
// At this point we're done writing.
break;
}
else
{
_encoder.Convert(
_charBuffer,
readBuffer,
flush: allContentRead,
out var charsUsed,
out bytesEncoded,
out encoderCompleted);
Debug.Assert(_overflowBuffer.Count != 0);
totalBytes += bytesEncoded;
_charBuffer = _charBuffer.Slice(charsUsed);
readBuffer = readBuffer.Slice(bytesEncoded);
}
return readBuffer.Count;
// We need to exit in one of the 2 conditions:
// * encoderCompleted will return false if "buffer" was too small for all the chars to be encoded.
// * no bytes were converted in an iteration. This can occur if there wasn't any input.
} while (encoderCompleted && bytesEncoded > 0);
return totalBytes;
default:
Debug.Fail("We should never see this");
throw new InvalidOperationException();
}
}
private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken)
private async Task ReadInputChars(CancellationToken cancellationToken)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
// the segment that follows.
@ -184,15 +173,12 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
out _);
_byteBuffer = _byteBuffer.Slice(bytesUsed);
_charBuffer = new ArraySegment<char>(_charBuffer.Array, 0, charsUsed);
return readBytes == 0;
}
private static void ThrowArgumentOutOfRangeException(byte[] buffer, int offset, int count)
{
if (count <= 0)
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}

View File

@ -49,7 +49,6 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await WriteAsync(ArraySegment<byte>.Empty, flush: true, cancellationToken);
await _stream.FlushAsync(cancellationToken);
}
@ -73,12 +72,11 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
{
ThrowArgumentException(buffer, offset, count);
var bufferSegment = new ArraySegment<byte>(buffer, offset, count);
return WriteAsync(bufferSegment, flush: false, cancellationToken);
return WriteAsync(bufferSegment, cancellationToken);
}
private async Task WriteAsync(
ArraySegment<byte> bufferSegment,
bool flush,
CancellationToken cancellationToken)
{
var decoderCompleted = false;
@ -87,7 +85,7 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
_decoder.Convert(
bufferSegment,
_charBuffer.AsSpan(_charsDecoded),
flush,
flush: false,
out var bytesDecoded,
out var charsDecoded,
out decoderCompleted);
@ -95,42 +93,35 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
_charsDecoded += charsDecoded;
bufferSegment = bufferSegment.Slice(bytesDecoded);
if (flush || !decoderCompleted)
if (!decoderCompleted)
{
// This is being invoked from FlushAsync or the char buffer is not large enough
// to accomodate all writes.
await WriteBufferAsync(flush, cancellationToken);
await WriteBufferAsync(cancellationToken);
}
}
}
private async Task WriteBufferAsync(bool flush, CancellationToken cancellationToken)
private async Task WriteBufferAsync(CancellationToken cancellationToken)
{
var encoderCompletd = false;
var encoderCompleted = false;
var charsWritten = 0;
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);
try
while (!encoderCompleted && charsWritten < _charsDecoded)
{
while (!encoderCompletd && charsWritten < _charsDecoded)
{
_encoder.Convert(
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
byteBuffer,
flush,
out var charsEncoded,
out var bytesUsed,
out encoderCompletd);
_encoder.Convert(
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
byteBuffer,
flush: false,
out var charsEncoded,
out var bytesUsed,
out encoderCompleted);
await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
charsWritten += charsEncoded;
}
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
charsWritten += charsEncoded;
}
ArrayPool<byte>.Shared.Return(byteBuffer);
// At this point, we've written all the buffered chars to the underlying Stream.
_charsDecoded = 0;
}
@ -161,5 +152,30 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
ArrayPool<char>.Shared.Return(_charBuffer);
}
}
public async Task FinalWriteAsync(CancellationToken cancellationToken)
{
// First write any buffered content
await WriteBufferAsync(cancellationToken);
// Now flush the encoder.
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);
var encoderCompleted = false;
while (!encoderCompleted)
{
_encoder.Convert(
Array.Empty<char>(),
byteBuffer,
flush: true,
out _,
out var bytesUsed,
out encoderCompleted);
await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
}
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}

View File

@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Core;
@ -83,13 +84,21 @@ namespace Microsoft.AspNetCore.Mvc.Infrastructure
var type = value?.GetType() ?? typeof(object);
await JsonSerializer.SerializeAsync(writeStream, value, type, jsonSerializerOptions);
// The transcoding streams use Encoders and Decoders that have internal buffers. We need to flush these
// when there is no more data to be written. Stream.FlushAsync isn't suitable since it's
// acceptable to Flush a Stream (multiple times) prior to completion.
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcodingStream.FinalWriteAsync(CancellationToken.None);
}
await writeStream.FlushAsync();
}
finally
{
if (writeStream is TranscodingWriteStream transcoding)
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcoding.DisposeAsync();
await transcodingStream.DisposeAsync();
}
}
}

View File

@ -1,7 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
// Arrange
var input = "Hello world";
var encoding = Encoding.Unicode;
var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
using var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
var bytes = new byte[4];
// Act
@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
// Arrange
var input = "Hello world";
var encoding = Encoding.Unicode;
var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
using var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
var bytes = new byte[3];
var expected = Encoding.UTF8.GetBytes(input.Substring(0, bytes.Length));
@ -62,7 +62,7 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
// Arrange
var input = new string('A', 1024 + 10);
var encoding = Encoding.Unicode;
var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
using var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
var bytes = new byte[1024];
var expected = Encoding.UTF8.GetBytes(input.Substring(0, bytes.Length));
@ -88,27 +88,69 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
{
// Arrange
// Test ensures that the overflow buffer works correctly
var input = new string('A', 4096 + 4);
var input = "☀";
var encoding = Encoding.Unicode;
var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
var bytes = new byte[4096];
var expected = Encoding.UTF8.GetBytes(input.Substring(0, bytes.Length));
using var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
var bytes = new byte[1];
var expected = Encoding.UTF8.GetBytes(input);
// Act
var readBytes = await stream.ReadAsync(bytes, 0, bytes.Length);
// Assert
Assert.Equal(bytes.Length, readBytes);
Assert.Equal(expected, bytes);
Assert.Equal(1, readBytes);
Assert.Equal(expected[0], bytes[0]);
Assert.Equal(0, stream.ByteBufferCount);
Assert.Equal(0, stream.CharBufferCount);
Assert.Equal(4, stream.OverflowCount);
Assert.Equal(2, stream.OverflowCount);
bytes = new byte[expected.Length - 1];
readBytes = await stream.ReadAsync(bytes, 0, bytes.Length);
Assert.Equal(4, readBytes);
Assert.Equal(bytes.Length, readBytes);
Assert.Equal(0, stream.ByteBufferCount);
Assert.Equal(0, stream.CharBufferCount);
Assert.Equal(0, stream.OverflowCount);
readBytes = await stream.ReadAsync(bytes, 0, bytes.Length);
Assert.Equal(0, readBytes);
}
public static TheoryData<string> ReadAsync_WithOverflowBuffer_AtBoundariesData => new TheoryData<string>
{
new string('a', TranscodingReadStream.MaxCharBufferSize - 1) + "☀",
new string('a', TranscodingReadStream.MaxCharBufferSize - 2) + "☀",
new string('a', TranscodingReadStream.MaxCharBufferSize) + "☀",
};
[Theory]
[MemberData(nameof(ReadAsync_WithOverflowBuffer_AtBoundariesData))]
public Task ReadAsync_WithOverflowBuffer_WithBufferSize1(string input) => ReadAsync_WithOverflowBufferAtCharBufferBoundaries(input, bufferSize: 1);
[Theory]
[MemberData(nameof(ReadAsync_WithOverflowBuffer_AtBoundariesData))]
public Task ReadAsync_WithOverflowBuffer_WithBufferSize2(string input) => ReadAsync_WithOverflowBufferAtCharBufferBoundaries(input, bufferSize: 1);
private static async Task<TranscodingReadStream> ReadAsync_WithOverflowBufferAtCharBufferBoundaries(string input, int bufferSize)
{
// Arrange
// Test ensures that the overflow buffer works correctly
var encoding = Encoding.Unicode;
var stream = new TranscodingReadStream(new MemoryStream(encoding.GetBytes(input)), encoding);
var bytes = new byte[1];
var expected = Encoding.UTF8.GetBytes(input);
// Act
int read;
var buffer = new byte[bufferSize];
var actual = new List<byte>();
while ((read = await stream.ReadAsync(buffer)) != 0)
{
actual.AddRange(buffer);
}
Assert.Equal(expected, actual);
return stream;
}
public static TheoryData ReadAsyncInputLatin =>

View File

@ -72,6 +72,7 @@ namespace Microsoft.AspNetCore.Mvc.Formatters.Json
var transcodingStream = new TranscodingWriteStream(stream, targetEncoding);
await JsonSerializer.SerializeAsync(transcodingStream, model, model.GetType());
await transcodingStream.FinalWriteAsync(default);
await transcodingStream.FlushAsync();
var actual = targetEncoding.GetString(stream.ToArray());