diff --git a/src/Microsoft.AspNetCore.Blazor.Server/Circuits/MessagePackBinaryBlockStream.cs b/src/Microsoft.AspNetCore.Blazor.Server/Circuits/MessagePackBinaryBlockStream.cs new file mode 100644 index 0000000000..6d0b0b15ab --- /dev/null +++ b/src/Microsoft.AspNetCore.Blazor.Server/Circuits/MessagePackBinaryBlockStream.cs @@ -0,0 +1,88 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using MessagePack; +using System; +using System.IO; + +namespace Microsoft.AspNetCore.Blazor.Server.Circuits +{ + /// + /// A write-only stream that outputs its data to an underlying expandable + /// buffer in the format for a MessagePack 'Bin32' block. Supports writing + /// into buffers up to 2GB in length. + /// + internal class MessagePackBinaryBlockStream : Stream + { + // MessagePack Bin32 block + // https://github.com/msgpack/msgpack/blob/master/spec.md#bin-format-family + const int HeaderLength = 5; + + private byte[] _buffer; + private int _headerStartOffset; + private int _bodyLength; + + public MessagePackBinaryBlockStream(byte[] buffer, int offset) + { + _buffer = buffer ?? throw new ArgumentNullException(nameof(buffer)); + _headerStartOffset = offset; + _bodyLength = 0; + + // Leave space for header + MessagePackBinary.EnsureCapacity(ref _buffer, _headerStartOffset, HeaderLength); + } + + public byte[] Buffer => _buffer; + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + + // Length is the complete number of bytes being output, including header + public override long Length => _bodyLength + HeaderLength; + + // Position is the index into the writable body (i.e., so position zero + // is the first byte you can actually write a value to) + public override long Position + { + get => _bodyLength; + set => throw new NotSupportedException(); + } + + public override void Flush() + { + // Nothing to do, as we're not buffering separately anyway + } + + public override int Read(byte[] buffer, int offset, int count) + => throw new NotImplementedException(); + + public override long Seek(long offset, SeekOrigin origin) + => throw new NotImplementedException(); + + public override void SetLength(long value) + => throw new NotImplementedException(); + + public override void Write(byte[] src, int srcOffset, int count) + { + var outputOffset = _headerStartOffset + HeaderLength + _bodyLength; + MessagePackBinary.EnsureCapacity(ref _buffer, outputOffset, count); + System.Buffer.BlockCopy(src, srcOffset, _buffer, outputOffset, count); + _bodyLength += count; + } + + public override void Close() + { + // Write the header into the space we reserved at the beginning + // This format matches the MessagePack spec + unchecked + { + _buffer[_headerStartOffset] = MessagePackCode.Bin32; + _buffer[_headerStartOffset + 1] = (byte)(_bodyLength >> 24); + _buffer[_headerStartOffset + 2] = (byte)(_bodyLength >> 16); + _buffer[_headerStartOffset + 3] = (byte)(_bodyLength >> 8); + _buffer[_headerStartOffset + 4] = (byte)(_bodyLength); + } + } + } +} diff --git a/src/Microsoft.AspNetCore.Blazor.Server/Circuits/RenderBatchFormatterResolver.cs b/src/Microsoft.AspNetCore.Blazor.Server/Circuits/RenderBatchFormatterResolver.cs index c95b7d1752..c4ad0f1c6d 100644 --- a/src/Microsoft.AspNetCore.Blazor.Server/Circuits/RenderBatchFormatterResolver.cs +++ b/src/Microsoft.AspNetCore.Blazor.Server/Circuits/RenderBatchFormatterResolver.cs @@ -29,15 +29,19 @@ namespace Microsoft.AspNetCore.Blazor.Server.Circuits public int Serialize(ref byte[] bytes, int offset, RenderBatch value, IFormatterResolver formatterResolver) { - using (var memoryStream = new MemoryStream()) - using (var renderBatchWriter = new RenderBatchWriter(memoryStream, leaveOpen: false)) + // Instead of using MessagePackBinary.WriteBytes, we write into a stream that + // knows how to format its output as a MessagePack binary block. The benefit + // is that we don't have to allocate a second large buffer to capture the + // RenderBatchWriter output - we can just write directly to the underlying + // output buffer. + using (var binaryBlockStream = new MessagePackBinaryBlockStream(bytes, offset)) + using (var renderBatchWriter = new RenderBatchWriter(binaryBlockStream, leaveOpen: false)) { renderBatchWriter.Write(value); - - var bytesBuffer = memoryStream.GetBuffer(); - return MessagePackBinary.WriteBytes(ref bytes, offset, bytesBuffer, 0, (int)memoryStream.Length); + + bytes = binaryBlockStream.Buffer; // In case the buffer was expanded + return (int)binaryBlockStream.Length; } - } } } diff --git a/test/Microsoft.AspnetCore.Blazor.Server.Test/Circuits/MessagePackBinaryBlockStreamTest.cs b/test/Microsoft.AspnetCore.Blazor.Server.Test/Circuits/MessagePackBinaryBlockStreamTest.cs new file mode 100644 index 0000000000..3e6dd725cc --- /dev/null +++ b/test/Microsoft.AspnetCore.Blazor.Server.Test/Circuits/MessagePackBinaryBlockStreamTest.cs @@ -0,0 +1,119 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using MessagePack; +using Microsoft.AspNetCore.Blazor.Server.Circuits; +using System; +using Xunit; + +namespace Microsoft.AspNetCore.Blazor.Server +{ + public class MessagePackBinaryBlockStreamTest + { + [Fact] + public void NullBuffer_Throws() + { + var ex = Assert.Throws(() => + { + new MessagePackBinaryBlockStream(null, 0); + }); + + Assert.Equal("buffer", ex.ParamName); + } + + [Fact] + public void WithNoWrites_JustOutputsHeader() + { + // Arrange + var buffer = new byte[100]; + var offset = 58; // Arbitrary + + // Act + new MessagePackBinaryBlockStream(buffer, offset).Dispose(); + + // Assert + Assert.Equal(MessagePackCode.Bin32, buffer[offset]); + Assert.Equal(0, ReadBigEndianInt32(buffer, offset + 1)); + } + + [Fact] + public void WithWrites_WritesToUnderlyingBuffer() + { + // Arrange + var buffer = new byte[100]; + var offset = 58; // Arbitrary + + // Act/Assert + using (var stream = new MessagePackBinaryBlockStream(buffer, offset)) + { + stream.Write(new byte[] { 10, 20, 30, 40 }, 1, 2); // Write 2 bytes + stream.Write(new byte[] { 101 }, 0, 1); // Write another 1 byte + stream.Close(); + + Assert.Equal(MessagePackCode.Bin32, buffer[offset]); + Assert.Equal(3, ReadBigEndianInt32(buffer, offset + 1)); + Assert.Equal(20, buffer[offset + 5]); + Assert.Equal(30, buffer[offset + 6]); + Assert.Equal(101, buffer[offset + 7]); + } + } + + [Fact] + public void LengthIncludesHeaderButPositionDoesNot() + { + // Arrange + var buffer = new byte[20]; + var offset = 3; + + // Act/Assert + using (var stream = new MessagePackBinaryBlockStream(buffer, offset)) + { + stream.Write(new byte[] { 0x01, 0x02 }, 0, 2); + Assert.Equal(7, stream.Length); + Assert.Equal(2, stream.Position); + } + } + + [Fact] + public void WithWrites_ExpandsBufferWhenNeeded() + { + // Arrange + var origBuffer = new byte[15]; + var offset = 6; + origBuffer[0] = 123; // So we can check it was retained during expansion + + // Act/Assert + using (var stream = new MessagePackBinaryBlockStream(origBuffer, offset)) + { + // We can fit the 6-byte offset plus 5-byte header plus 3 written bytes + // into the original 15-byte buffer + stream.Write(new byte[] { 10, 20, 30 }, 0, 3); + Assert.Same(origBuffer, stream.Buffer); + + // Trying to add two more exceeds the capacity, so the buffer expands + stream.Write(new byte[] { 40, 50 }, 0, 2); + Assert.NotSame(origBuffer, stream.Buffer); + Assert.True(stream.Buffer.Length > origBuffer.Length); + + // Check the expanded buffer has the expected contents + stream.Close(); + Assert.Equal(123, stream.Buffer[0]); // Retains other values from original buffer + Assert.Equal(MessagePackCode.Bin32, stream.Buffer[offset]); + Assert.Equal(5, ReadBigEndianInt32(stream.Buffer, offset + 1)); + Assert.Equal(10, stream.Buffer[offset + 5]); + Assert.Equal(20, stream.Buffer[offset + 6]); + Assert.Equal(30, stream.Buffer[offset + 7]); + Assert.Equal(40, stream.Buffer[offset + 8]); + Assert.Equal(50, stream.Buffer[offset + 9]); + } + } + + int ReadBigEndianInt32(byte[] buffer, int startOffset) + { + return (buffer[startOffset] << 24) + + (buffer[startOffset + 1] << 16) + + (buffer[startOffset + 2] << 8) + + (buffer[startOffset + 3]); + } + } +}