Eliminate temporary MemoryStream buffers used during RenderBatch serialization (#1329)

* Eliminate temporary MemoryStream buffers used during RenderBatch serialization. Fixes #1132

* CR: Fix namespace
This commit is contained in:
Steve Sanderson 2018-08-21 14:08:39 +01:00 committed by GitHub
parent 7a763fc4f6
commit 52813ddf63
3 changed files with 217 additions and 6 deletions

View File

@ -0,0 +1,88 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using MessagePack;
using System;
using System.IO;
namespace Microsoft.AspNetCore.Blazor.Server.Circuits
{
/// <summary>
/// A write-only stream that outputs its data to an underlying expandable
/// buffer in the format for a MessagePack 'Bin32' block. Supports writing
/// into buffers up to 2GB in length.
/// </summary>
internal class MessagePackBinaryBlockStream : Stream
{
// MessagePack Bin32 block
// https://github.com/msgpack/msgpack/blob/master/spec.md#bin-format-family
const int HeaderLength = 5;
private byte[] _buffer;
private int _headerStartOffset;
private int _bodyLength;
public MessagePackBinaryBlockStream(byte[] buffer, int offset)
{
_buffer = buffer ?? throw new ArgumentNullException(nameof(buffer));
_headerStartOffset = offset;
_bodyLength = 0;
// Leave space for header
MessagePackBinary.EnsureCapacity(ref _buffer, _headerStartOffset, HeaderLength);
}
public byte[] Buffer => _buffer;
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
// Length is the complete number of bytes being output, including header
public override long Length => _bodyLength + HeaderLength;
// Position is the index into the writable body (i.e., so position zero
// is the first byte you can actually write a value to)
public override long Position
{
get => _bodyLength;
set => throw new NotSupportedException();
}
public override void Flush()
{
// Nothing to do, as we're not buffering separately anyway
}
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotImplementedException();
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotImplementedException();
public override void SetLength(long value)
=> throw new NotImplementedException();
public override void Write(byte[] src, int srcOffset, int count)
{
var outputOffset = _headerStartOffset + HeaderLength + _bodyLength;
MessagePackBinary.EnsureCapacity(ref _buffer, outputOffset, count);
System.Buffer.BlockCopy(src, srcOffset, _buffer, outputOffset, count);
_bodyLength += count;
}
public override void Close()
{
// Write the header into the space we reserved at the beginning
// This format matches the MessagePack spec
unchecked
{
_buffer[_headerStartOffset] = MessagePackCode.Bin32;
_buffer[_headerStartOffset + 1] = (byte)(_bodyLength >> 24);
_buffer[_headerStartOffset + 2] = (byte)(_bodyLength >> 16);
_buffer[_headerStartOffset + 3] = (byte)(_bodyLength >> 8);
_buffer[_headerStartOffset + 4] = (byte)(_bodyLength);
}
}
}
}

View File

@ -29,15 +29,19 @@ namespace Microsoft.AspNetCore.Blazor.Server.Circuits
public int Serialize(ref byte[] bytes, int offset, RenderBatch value, IFormatterResolver formatterResolver)
{
using (var memoryStream = new MemoryStream())
using (var renderBatchWriter = new RenderBatchWriter(memoryStream, leaveOpen: false))
// Instead of using MessagePackBinary.WriteBytes, we write into a stream that
// knows how to format its output as a MessagePack binary block. The benefit
// is that we don't have to allocate a second large buffer to capture the
// RenderBatchWriter output - we can just write directly to the underlying
// output buffer.
using (var binaryBlockStream = new MessagePackBinaryBlockStream(bytes, offset))
using (var renderBatchWriter = new RenderBatchWriter(binaryBlockStream, leaveOpen: false))
{
renderBatchWriter.Write(value);
var bytesBuffer = memoryStream.GetBuffer();
return MessagePackBinary.WriteBytes(ref bytes, offset, bytesBuffer, 0, (int)memoryStream.Length);
bytes = binaryBlockStream.Buffer; // In case the buffer was expanded
return (int)binaryBlockStream.Length;
}
}
}
}

View File

@ -0,0 +1,119 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using MessagePack;
using Microsoft.AspNetCore.Blazor.Server.Circuits;
using System;
using Xunit;
namespace Microsoft.AspNetCore.Blazor.Server
{
public class MessagePackBinaryBlockStreamTest
{
[Fact]
public void NullBuffer_Throws()
{
var ex = Assert.Throws<ArgumentNullException>(() =>
{
new MessagePackBinaryBlockStream(null, 0);
});
Assert.Equal("buffer", ex.ParamName);
}
[Fact]
public void WithNoWrites_JustOutputsHeader()
{
// Arrange
var buffer = new byte[100];
var offset = 58; // Arbitrary
// Act
new MessagePackBinaryBlockStream(buffer, offset).Dispose();
// Assert
Assert.Equal(MessagePackCode.Bin32, buffer[offset]);
Assert.Equal(0, ReadBigEndianInt32(buffer, offset + 1));
}
[Fact]
public void WithWrites_WritesToUnderlyingBuffer()
{
// Arrange
var buffer = new byte[100];
var offset = 58; // Arbitrary
// Act/Assert
using (var stream = new MessagePackBinaryBlockStream(buffer, offset))
{
stream.Write(new byte[] { 10, 20, 30, 40 }, 1, 2); // Write 2 bytes
stream.Write(new byte[] { 101 }, 0, 1); // Write another 1 byte
stream.Close();
Assert.Equal(MessagePackCode.Bin32, buffer[offset]);
Assert.Equal(3, ReadBigEndianInt32(buffer, offset + 1));
Assert.Equal(20, buffer[offset + 5]);
Assert.Equal(30, buffer[offset + 6]);
Assert.Equal(101, buffer[offset + 7]);
}
}
[Fact]
public void LengthIncludesHeaderButPositionDoesNot()
{
// Arrange
var buffer = new byte[20];
var offset = 3;
// Act/Assert
using (var stream = new MessagePackBinaryBlockStream(buffer, offset))
{
stream.Write(new byte[] { 0x01, 0x02 }, 0, 2);
Assert.Equal(7, stream.Length);
Assert.Equal(2, stream.Position);
}
}
[Fact]
public void WithWrites_ExpandsBufferWhenNeeded()
{
// Arrange
var origBuffer = new byte[15];
var offset = 6;
origBuffer[0] = 123; // So we can check it was retained during expansion
// Act/Assert
using (var stream = new MessagePackBinaryBlockStream(origBuffer, offset))
{
// We can fit the 6-byte offset plus 5-byte header plus 3 written bytes
// into the original 15-byte buffer
stream.Write(new byte[] { 10, 20, 30 }, 0, 3);
Assert.Same(origBuffer, stream.Buffer);
// Trying to add two more exceeds the capacity, so the buffer expands
stream.Write(new byte[] { 40, 50 }, 0, 2);
Assert.NotSame(origBuffer, stream.Buffer);
Assert.True(stream.Buffer.Length > origBuffer.Length);
// Check the expanded buffer has the expected contents
stream.Close();
Assert.Equal(123, stream.Buffer[0]); // Retains other values from original buffer
Assert.Equal(MessagePackCode.Bin32, stream.Buffer[offset]);
Assert.Equal(5, ReadBigEndianInt32(stream.Buffer, offset + 1));
Assert.Equal(10, stream.Buffer[offset + 5]);
Assert.Equal(20, stream.Buffer[offset + 6]);
Assert.Equal(30, stream.Buffer[offset + 7]);
Assert.Equal(40, stream.Buffer[offset + 8]);
Assert.Equal(50, stream.Buffer[offset + 9]);
}
}
int ReadBigEndianInt32(byte[] buffer, int startOffset)
{
return (buffer[startOffset] << 24)
+ (buffer[startOffset + 1] << 16)
+ (buffer[startOffset + 2] << 8)
+ (buffer[startOffset + 3]);
}
}
}