Remove custom stream->pipe wrappers and make BodyReader/BodyWriter read-only (#10154)

This commit is contained in:
Andrew Stanton-Nurse 2019-05-31 18:38:25 -07:00 committed by Justin Kotalik
parent 1165a6fb16
commit a96642f6fd
44 changed files with 386 additions and 4792 deletions

View File

@ -250,7 +250,7 @@ namespace Microsoft.AspNetCore.Http
{
protected HttpRequest() { }
public abstract System.IO.Stream Body { get; set; }
public virtual System.IO.Pipelines.PipeReader BodyReader { get { throw null; } set { } }
public virtual System.IO.Pipelines.PipeReader BodyReader { get { throw null; } }
public abstract long? ContentLength { get; set; }
public abstract string ContentType { get; set; }
public abstract Microsoft.AspNetCore.Http.IRequestCookieCollection Cookies { get; set; }
@ -274,7 +274,7 @@ namespace Microsoft.AspNetCore.Http
{
protected HttpResponse() { }
public abstract System.IO.Stream Body { get; set; }
public virtual System.IO.Pipelines.PipeWriter BodyWriter { get { throw null; } set { } }
public virtual System.IO.Pipelines.PipeWriter BodyWriter { get { throw null; } }
public abstract long? ContentLength { get; set; }
public abstract string ContentType { get; set; }
public abstract Microsoft.AspNetCore.Http.IResponseCookies Cookies { get; }

View File

@ -105,9 +105,9 @@ namespace Microsoft.AspNetCore.Http
public abstract Stream Body { get; set; }
/// <summary>
/// Gets or sets the request body pipe <see cref="PipeReader"/>.
/// Gets the request body pipe <see cref="PipeReader"/>.
/// </summary>
public virtual PipeReader BodyReader { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public virtual PipeReader BodyReader { get => throw new NotImplementedException(); }
/// <summary>
/// Checks the Content-Type header for form types.

View File

@ -45,9 +45,9 @@ namespace Microsoft.AspNetCore.Http
public abstract Stream Body { get; set; }
/// <summary>
/// Gets or sets the response body pipe <see cref="PipeWriter"/>
/// Gets the response body pipe <see cref="PipeWriter"/>
/// </summary>
public virtual PipeWriter BodyWriter { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public virtual PipeWriter BodyWriter { get => throw new NotImplementedException(); }
/// <summary>
/// Gets or sets the value for the <c>Content-Length</c> response header.

View File

@ -3,8 +3,7 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.IO.Pipelines.Tests;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;
@ -36,15 +35,12 @@ namespace Microsoft.AspNetCore.Http
[MemberData(nameof(Encodings))]
public async Task WritingTextThatRequiresMultipleSegmentsWorks(Encoding encoding)
{
// Need to change the StreamPipeWriter with a capped MemoryPool
var memoryPool = new TestMemoryPool(maxBufferSize: 16);
var outputStream = new MemoryStream();
var streamPipeWriter = new StreamPipeWriter(outputStream, minimumSegmentSize: 0, memoryPool);
HttpContext context = new DefaultHttpContext();
context.Response.BodyWriter = streamPipeWriter;
context.Response.Body = outputStream;
var inputString = "昨日すき焼きを食べました";
var inputString = string.Concat(Enumerable.Repeat("昨日すき焼きを食べました", 1000));
var expected = encoding.GetBytes(inputString);
await context.Response.WriteAsync(inputString, encoding);
@ -52,11 +48,8 @@ namespace Microsoft.AspNetCore.Http
var actual = new byte[expected.Length];
var length = outputStream.Read(actual);
var res1 = encoding.GetString(actual);
var res2 = encoding.GetString(expected);
Assert.Equal(expected.Length, length);
Assert.Equal(expected, actual);
streamPipeWriter.Complete();
}
[Theory]

View File

@ -246,7 +246,7 @@ namespace Microsoft.AspNetCore.Http.Features
}
public partial interface IRequestBodyPipeFeature
{
System.IO.Pipelines.PipeReader Reader { get; set; }
System.IO.Pipelines.PipeReader Reader { get; }
}
public partial interface IRequestCookiesFeature
{
@ -254,7 +254,7 @@ namespace Microsoft.AspNetCore.Http.Features
}
public partial interface IResponseBodyPipeFeature
{
System.IO.Pipelines.PipeWriter Writer { get; set; }
System.IO.Pipelines.PipeWriter Writer { get; }
}
public partial interface IResponseCookiesFeature
{

View File

@ -13,6 +13,6 @@ namespace Microsoft.AspNetCore.Http.Features
/// <summary>
/// A <see cref="PipeReader"/> representing the request body, if any.
/// </summary>
PipeReader Reader { get; set; }
PipeReader Reader { get; }
}
}

View File

@ -14,6 +14,6 @@ namespace Microsoft.AspNetCore.Http.Features
/// <summary>
/// A <see cref="PipeWriter"/> representing the response body, if any.
/// </summary>
PipeWriter Writer { get; set; }
PipeWriter Writer { get; }
}
}

View File

@ -1,70 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Http
{
public class NoopStream : Stream
{
public override bool CanRead => true;
public override bool CanSeek => throw new NotImplementedException();
public override bool CanWrite => true;
public override long Length => throw new NotImplementedException();
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
return 0;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return Task.FromResult<int>(0);
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return new ValueTask<int>(0);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
}
public override void Write(byte[] buffer, int offset, int count)
{
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
return default(ValueTask);
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

View File

@ -1,82 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
namespace Microsoft.AspNetCore.Http
{
public class StreamPipeReaderBenchmark
{
private StreamPipeReader _pipeReaderNoop;
private StreamPipeReader _pipeReaderHelloWorld;
private StreamPipeReader _pipeReaderHelloWorldAync;
[IterationSetup]
public void Setup()
{
_pipeReaderNoop = new StreamPipeReader(new NoopStream());
_pipeReaderHelloWorld = new StreamPipeReader(new HelloWorldStream());
_pipeReaderHelloWorldAync = new StreamPipeReader(new HelloWorldAsyncStream());
}
[Benchmark]
public async Task ReadNoop()
{
await _pipeReaderNoop.ReadAsync();
}
[Benchmark]
public async Task ReadHelloWorld()
{
var result = await _pipeReaderHelloWorld.ReadAsync();
_pipeReaderHelloWorld.AdvanceTo(result.Buffer.End);
}
[Benchmark]
public async Task ReadHelloWorldAsync()
{
var result = await _pipeReaderHelloWorldAync.ReadAsync();
_pipeReaderHelloWorldAync.AdvanceTo(result.Buffer.End);
}
private class HelloWorldStream : NoopStream
{
private static byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
bytes.CopyTo(buffer, 0);
return Task.FromResult(11);
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
bytes.CopyTo(buffer);
return new ValueTask<int>(11);
}
}
private class HelloWorldAsyncStream : NoopStream
{
private static byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await Task.Yield();
bytes.CopyTo(buffer, 0);
return 11;
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
await Task.Yield();
bytes.CopyTo(buffer);
return 11;
}
}
}
}

View File

@ -1,38 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
namespace Microsoft.AspNetCore.Http
{
public class StreamPipeWriterBenchmark
{
private Stream _memoryStream;
private StreamPipeWriter _pipeWriter;
private static byte[] _helloWorldBytes = Encoding.ASCII.GetBytes("Hello World");
private static byte[] _largeWrite = Encoding.ASCII.GetBytes(new string('a', 50000));
[IterationSetup]
public void Setup()
{
_memoryStream = new NoopStream();
_pipeWriter = new StreamPipeWriter(_memoryStream);
}
[Benchmark]
public async Task WriteHelloWorld()
{
await _pipeWriter.WriteAsync(_helloWorldBytes);
}
[Benchmark]
public async Task WriteHelloWorldLargeWrite()
{
await _pipeWriter.WriteAsync(_largeWrite);
}
}
}

View File

@ -236,7 +236,7 @@ namespace Microsoft.AspNetCore.Http.Features
public partial class RequestBodyPipeFeature : Microsoft.AspNetCore.Http.Features.IRequestBodyPipeFeature
{
public RequestBodyPipeFeature(Microsoft.AspNetCore.Http.HttpContext context) { }
public System.IO.Pipelines.PipeReader Reader { get { throw null; } set { } }
public System.IO.Pipelines.PipeReader Reader { get { throw null; } }
}
public partial class RequestCookiesFeature : Microsoft.AspNetCore.Http.Features.IRequestCookiesFeature
{
@ -254,7 +254,7 @@ namespace Microsoft.AspNetCore.Http.Features
public partial class ResponseBodyPipeFeature : Microsoft.AspNetCore.Http.Features.IResponseBodyPipeFeature
{
public ResponseBodyPipeFeature(Microsoft.AspNetCore.Http.HttpContext context) { }
public System.IO.Pipelines.PipeWriter Writer { get { throw null; } set { } }
public System.IO.Pipelines.PipeWriter Writer { get { throw null; } }
}
public partial class ResponseCookiesFeature : Microsoft.AspNetCore.Http.Features.IResponseCookiesFeature
{
@ -326,7 +326,7 @@ namespace Microsoft.AspNetCore.Http.Internal
{
public DefaultHttpRequest(Microsoft.AspNetCore.Http.DefaultHttpContext context) { }
public override System.IO.Stream Body { get { throw null; } set { } }
public override System.IO.Pipelines.PipeReader BodyReader { get { throw null; } set { } }
public override System.IO.Pipelines.PipeReader BodyReader { get { throw null; } }
public override long? ContentLength { get { throw null; } set { } }
public override string ContentType { get { throw null; } set { } }
public override Microsoft.AspNetCore.Http.IRequestCookieCollection Cookies { get { throw null; } set { } }
@ -353,7 +353,7 @@ namespace Microsoft.AspNetCore.Http.Internal
{
public DefaultHttpResponse(Microsoft.AspNetCore.Http.DefaultHttpContext context) { }
public override System.IO.Stream Body { get { throw null; } set { } }
public override System.IO.Pipelines.PipeWriter BodyWriter { get { throw null; } set { } }
public override System.IO.Pipelines.PipeWriter BodyWriter { get { throw null; } }
public override long? ContentLength { get { throw null; } set { } }
public override string ContentType { get { throw null; } set { } }
public override Microsoft.AspNetCore.Http.IResponseCookies Cookies { get { throw null; } }
@ -492,93 +492,3 @@ namespace Microsoft.Extensions.DependencyInjection
public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddHttpContextAccessor(this Microsoft.Extensions.DependencyInjection.IServiceCollection services) { throw null; }
}
}
namespace System.IO.Pipelines
{
public partial class ReadOnlyPipeStream : System.IO.Stream
{
public ReadOnlyPipeStream(System.IO.Pipelines.PipeReader pipeReader) { }
public ReadOnlyPipeStream(System.IO.Pipelines.PipeReader pipeReader, bool allowSynchronousIO) { }
public override bool CanRead { get { throw null; } }
public override bool CanSeek { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public System.IO.Pipelines.PipeReader InnerPipeReader { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public override int WriteTimeout { get { throw null; } set { } }
public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) { throw null; }
public override System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, int bufferSize, System.Threading.CancellationToken cancellationToken) { throw null; }
public override int EndRead(System.IAsyncResult asyncResult) { throw null; }
public override void Flush() { }
public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public override int Read(byte[] buffer, int offset, int count) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public override void Write(byte[] buffer, int offset, int count) { }
public override System.Threading.Tasks.Task WriteAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class StreamPipeReader : System.IO.Pipelines.PipeReader, System.IDisposable
{
public StreamPipeReader(System.IO.Stream readingStream) { }
public StreamPipeReader(System.IO.Stream readingStream, System.IO.Pipelines.StreamPipeReaderAdapterOptions options) { }
public System.IO.Stream InnerStream { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public override void AdvanceTo(System.SequencePosition consumed) { }
public override void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined) { }
public override void CancelPendingRead() { }
public override void Complete(System.Exception exception = null) { }
public void Dispose() { }
public override void OnWriterCompleted(System.Action<System.Exception, object> callback, object state) { }
[System.Diagnostics.DebuggerStepThroughAttribute]
public override System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override bool TryRead(out System.IO.Pipelines.ReadResult result) { throw null; }
}
public partial class StreamPipeReaderAdapterOptions
{
public const int DefaultMinimumReadThreshold = 256;
public const int DefaultMinimumSegmentSize = 4096;
public static System.IO.Pipelines.StreamPipeReaderAdapterOptions DefaultOptions;
public StreamPipeReaderAdapterOptions() { }
public StreamPipeReaderAdapterOptions(int minimumSegmentSize, int minimumReadThreshold, System.Buffers.MemoryPool<byte> memoryPool) { }
public System.Buffers.MemoryPool<byte> MemoryPool { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public int MinimumReadThreshold { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public int MinimumSegmentSize { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
}
public partial class StreamPipeWriter : System.IO.Pipelines.PipeWriter, System.IDisposable
{
public StreamPipeWriter(System.IO.Stream writingStream) { }
public StreamPipeWriter(System.IO.Stream writingStream, int minimumSegmentSize, System.Buffers.MemoryPool<byte> pool = null) { }
public System.IO.Stream InnerStream { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public override void Advance(int count) { }
public override void CancelPendingFlush() { }
public override void Complete(System.Exception exception = null) { }
public void Dispose() { }
public override System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Memory<byte> GetMemory(int sizeHint = 0) { throw null; }
public override System.Span<byte> GetSpan(int sizeHint = 0) { throw null; }
public override void OnReaderCompleted(System.Action<System.Exception, object> callback, object state) { }
}
public partial class WriteOnlyPipeStream : System.IO.Stream
{
public WriteOnlyPipeStream(System.IO.Pipelines.PipeWriter pipeWriter) { }
public WriteOnlyPipeStream(System.IO.Pipelines.PipeWriter pipeWriter, bool allowSynchronousIO) { }
public override bool CanRead { get { throw null; } }
public override bool CanSeek { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public System.IO.Pipelines.PipeWriter InnerPipeWriter { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public override int ReadTimeout { get { throw null; } set { } }
public override System.IAsyncResult BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) { throw null; }
public override void EndWrite(System.IAsyncResult asyncResult) { }
public override void Flush() { }
public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public override int Read(byte[] buffer, int offset, int count) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public override void Write(byte[] buffer, int offset, int count) { }
public override System.Threading.Tasks.Task WriteAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
}

View File

@ -2,14 +2,16 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Http.Features
{
public class RequestBodyPipeFeature : IRequestBodyPipeFeature
{
private StreamPipeReader _internalPipeReader;
private PipeReader _userSetPipeReader;
private PipeReader _internalPipeReader;
private Stream _streamInstanceWhenWrapped;
private HttpContext _context;
public RequestBodyPipeFeature(HttpContext context)
@ -25,25 +27,21 @@ namespace Microsoft.AspNetCore.Http.Features
{
get
{
if (_userSetPipeReader != null)
{
return _userSetPipeReader;
}
if (_internalPipeReader == null ||
!object.ReferenceEquals(_internalPipeReader.InnerStream, _context.Request.Body))
!ReferenceEquals(_streamInstanceWhenWrapped, _context.Request.Body))
{
_internalPipeReader = new StreamPipeReader(_context.Request.Body);
_context.Response.RegisterForDispose(_internalPipeReader);
_streamInstanceWhenWrapped = _context.Request.Body;
_internalPipeReader = PipeReader.Create(_context.Request.Body);
_context.Response.OnCompleted((self) =>
{
((PipeReader)self).Complete();
return Task.CompletedTask;
}, _internalPipeReader);
}
return _internalPipeReader;
}
set
{
_userSetPipeReader = value ?? throw new ArgumentNullException(nameof(value));
// TODO set the request body Stream to an adapted pipe https://github.com/aspnet/AspNetCore/issues/3971
}
}
}
}

View File

@ -2,14 +2,16 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Http.Features
{
public class ResponseBodyPipeFeature : IResponseBodyPipeFeature
{
private StreamPipeWriter _internalPipeWriter;
private PipeWriter _userSetPipeWriter;
private PipeWriter _internalPipeWriter;
private Stream _streamInstanceWhenWrapped;
private HttpContext _context;
public ResponseBodyPipeFeature(HttpContext context)
@ -25,25 +27,21 @@ namespace Microsoft.AspNetCore.Http.Features
{
get
{
if (_userSetPipeWriter != null)
{
return _userSetPipeWriter;
}
if (_internalPipeWriter == null ||
!object.ReferenceEquals(_internalPipeWriter.InnerStream, _context.Response.Body))
!ReferenceEquals(_streamInstanceWhenWrapped, _context.Response.Body))
{
_internalPipeWriter = new StreamPipeWriter(_context.Response.Body);
_context.Response.RegisterForDispose(_internalPipeWriter);
_streamInstanceWhenWrapped = _context.Response.Body;
_internalPipeWriter = PipeWriter.Create(_context.Response.Body);
_context.Response.OnCompleted((self) =>
{
((PipeWriter)self).Complete();
return Task.CompletedTask;
}, _internalPipeWriter);
}
return _internalPipeWriter;
}
set
{
_userSetPipeWriter = value ?? throw new ArgumentNullException(nameof(value));
// TODO set the response body Stream to an adapted pipe https://github.com/aspnet/AspNetCore/issues/3971
}
}
}
}

View File

@ -174,7 +174,6 @@ namespace Microsoft.AspNetCore.Http.Internal
public override PipeReader BodyReader
{
get { return RequestBodyPipeFeature.Reader; }
set { RequestBodyPipeFeature.Reader = value; }
}
struct FeatureInterfaces

View File

@ -112,7 +112,6 @@ namespace Microsoft.AspNetCore.Http.Internal
public override PipeWriter BodyWriter
{
get { return ResponseBodyPipeFeature.Writer; }
set { ResponseBodyPipeFeature.Writer = value; }
}
public override void OnStarting(Func<object, Task> callback, object state)

View File

@ -1,247 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
/// <summary>
/// Represents a read-only Stream backed by a PipeReader
/// </summary>
public class ReadOnlyPipeStream : Stream
{
private bool _allowSynchronousIO = true;
/// <summary>
/// Creates a new ReadOnlyPipeStream
/// </summary>
/// <param name="pipeReader">The PipeReader to read from.</param>
public ReadOnlyPipeStream(PipeReader pipeReader) :
this(pipeReader, allowSynchronousIO: true)
{
}
/// <summary>
/// Creates a new ReadOnlyPipeStream
/// </summary>
/// <param name="pipeReader">The PipeReader to read from.</param>
/// <param name="allowSynchronousIO">Whether synchronous IO is allowed.</param>
public ReadOnlyPipeStream(PipeReader pipeReader, bool allowSynchronousIO)
{
_allowSynchronousIO = allowSynchronousIO;
InnerPipeReader = pipeReader;
}
/// <inheritdoc />
public override bool CanSeek => false;
/// <inheritdoc />
public override bool CanRead => true;
/// <inheritdoc />
public override bool CanWrite => false;
/// <inheritdoc />
public override long Length => throw new NotSupportedException();
/// <inheritdoc />
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override int WriteTimeout
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public PipeReader InnerPipeReader { get; }
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
/// <inheritdoc />
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
/// <inheritdoc />
public override void Flush()
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override Task FlushAsync(CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
if (!_allowSynchronousIO)
{
ThrowHelper.ThrowInvalidOperationException_SynchronousReadsDisallowed();
}
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}
/// <inheritdoc />
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = ReadAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
/// <inheritdoc />
public override int EndRead(IAsyncResult asyncResult)
{
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
}
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<int>(state);
var task = ReadAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(task2.Result);
}
}, tcs, cancellationToken);
return tcs.Task;
}
/// <inheritdoc />
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
/// <inheritdoc />
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination, cancellationToken);
}
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
while (true)
{
var result = await InnerPipeReader.ReadAsync(cancellationToken);
if (result.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;
var consumed = readableBuffer.End;
var actual = 0;
try
{
if (readableBufferLength != 0)
{
actual = (int)Math.Min(readableBufferLength, buffer.Length);
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);
return actual;
}
if (result.IsCompleted)
{
return 0;
}
}
finally
{
InnerPipeReader.AdvanceTo(consumed);
}
}
}
/// <inheritdoc />
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
if (destination == null)
{
throw new ArgumentNullException(nameof(destination));
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize));
}
return CopyToAsyncInternal(destination, cancellationToken);
}
private async Task CopyToAsyncInternal(Stream destination, CancellationToken cancellationToken)
{
while (true)
{
var result = await InnerPipeReader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;
try
{
if (readableBufferLength != 0)
{
foreach (var memory in readableBuffer)
{
await destination.WriteAsync(memory, cancellationToken);
}
}
if (result.IsCompleted)
{
return;
}
}
finally
{
InnerPipeReader.AdvanceTo(readableBuffer.End);
}
}
}
}
}

View File

@ -1,393 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
/// <summary>
/// Implements PipeReader using an underlying stream.
/// </summary>
public class StreamPipeReader : PipeReader, IDisposable
{
private readonly int _bufferSize;
private readonly int _minimumReadThreshold;
private readonly MemoryPool<byte> _pool;
private CancellationTokenSource _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;
private ExceptionDispatchInfo _exceptionInfo;
private BufferSegment _readHead;
private int _readIndex;
private BufferSegment _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private object _lock = new object();
/// <summary>
/// Creates a new StreamPipeReader.
/// </summary>
/// <param name="readingStream">The stream to read from.</param>
public StreamPipeReader(Stream readingStream)
: this(readingStream, StreamPipeReaderAdapterOptions.DefaultOptions)
{
}
/// <summary>
/// Creates a new StreamPipeReader.
/// </summary>
/// <param name="readingStream">The stream to read from.</param>
/// <param name="options">The options to use.</param>
public StreamPipeReader(Stream readingStream, StreamPipeReaderAdapterOptions options)
{
InnerStream = readingStream ?? throw new ArgumentNullException(nameof(readingStream));
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
if (options.MinimumReadThreshold <= 0)
{
throw new ArgumentOutOfRangeException(nameof(options.MinimumReadThreshold));
}
_minimumReadThreshold = Math.Min(options.MinimumReadThreshold, options.MinimumSegmentSize);
_pool = options.MemoryPool == MemoryPool<byte>.Shared ? null : options.MemoryPool;
_bufferSize = _pool == null ? options.MinimumSegmentSize : Math.Min(options.MinimumSegmentSize, _pool.MaxBufferSize);
}
/// <summary>
/// Gets the inner stream that is being read from.
/// </summary>
public Stream InnerStream { get; }
/// <inheritdoc />
public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}
private CancellationTokenSource InternalTokenSource
{
get
{
lock (_lock)
{
if (_internalTokenSource == null)
{
_internalTokenSource = new CancellationTokenSource();
}
return _internalTokenSource;
}
}
set
{
_internalTokenSource = value;
}
}
/// <inheritdoc />
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
ThrowIfCompleted();
if (_readHead == null || _readTail == null)
{
ThrowHelper.ThrowInvalidOperationException_NoDataRead();
}
AdvanceTo((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
}
private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
if (consumedSegment == null)
{
return;
}
var returnStart = _readHead;
var returnEnd = consumedSegment;
var consumedBytes = new ReadOnlySequence<byte>(returnStart, _readIndex, consumedSegment, consumedIndex).Length;
_bufferedBytes -= consumedBytes;
Debug.Assert(_bufferedBytes >= 0);
_examinedEverything = false;
if (examinedSegment == _readTail)
{
// If we examined everything, we force ReadAsync to actually read from the underlying stream
// instead of returning a ReadResult from TryRead.
_examinedEverything = examinedIndex == _readTail.End;
}
// Three cases here:
// 1. All data is consumed. If so, we reset _readHead and _readTail to _readTail's original memory owner
// SetMemory on a IMemoryOwner will reset the internal Memory<byte> to be an empty segment
// 2. A segment is entirely consumed but there is still more data in nextSegments
// We are allowed to remove an extra segment. by setting returnEnd to be the next block.
// 3. We are in the middle of a segment.
// Move _readHead and _readIndex to consumedSegment and index
if (_bufferedBytes == 0)
{
_readTail.SetMemory(_readTail.MemoryOwner);
_readHead = _readTail;
returnEnd = _readTail;
_readIndex = 0;
}
else if (consumedIndex == returnEnd.Length)
{
var nextBlock = returnEnd.NextSegment;
_readHead = nextBlock;
_readIndex = 0;
returnEnd = nextBlock;
}
else
{
_readHead = consumedSegment;
_readIndex = consumedIndex;
}
// Remove all blocks that are freed (except the last one)
while (returnStart != returnEnd)
{
returnStart.ResetMemory();
returnStart = returnStart.NextSegment;
}
}
/// <inheritdoc />
public override void CancelPendingRead()
{
InternalTokenSource.Cancel();
}
/// <inheritdoc />
public override void Complete(Exception exception = null)
{
if (_isReaderCompleted)
{
return;
}
_isReaderCompleted = true;
if (exception != null)
{
_exceptionInfo = ExceptionDispatchInfo.Capture(exception);
}
var segment = _readHead;
while (segment != null)
{
segment.ResetMemory();
segment = segment.NextSegment;
}
}
/// <inheritdoc />
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
throw new NotSupportedException("OnWriterCompleted is not supported");
}
/// <inheritdoc />
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
var tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out var readResult))
{
return readResult;
}
if (_isStreamCompleted)
{
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
}
var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.Register(state => ((StreamPipeReader)state).Cancel(), this);
}
using (reg)
{
var isCanceled = false;
try
{
AllocateReadTail();
#if NETCOREAPP3_0
var length = await InnerStream.ReadAsync(_readTail.AvailableMemory.Slice(_readTail.End), tokenSource.Token);
#elif NETSTANDARD2_0
if (!MemoryMarshal.TryGetArray<byte>(_readTail.AvailableMemory.Slice(_readTail.End), out var arraySegment))
{
ThrowHelper.CreateInvalidOperationException_NoArrayFromMemory();
}
var length = await _readingStream.ReadAsync(arraySegment.Array, arraySegment.Offset, arraySegment.Count, tokenSource.Token);
#else
#error Target frameworks need to be updated.
#endif
Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);
_readTail.End += length;
_bufferedBytes += length;
if (length == 0)
{
_isStreamCompleted = true;
}
}
catch (OperationCanceledException)
{
ClearCancellationToken();
if (cancellationToken.IsCancellationRequested)
{
throw;
}
isCanceled = true;
}
return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, IsCompletedOrThrow());
}
}
private void ClearCancellationToken()
{
lock (_lock)
{
_internalTokenSource = null;
}
}
private void ThrowIfCompleted()
{
if (_isReaderCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
}
public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();
return TryReadInternal(InternalTokenSource, out result);
}
private bool TryReadInternal(CancellationTokenSource source, out ReadResult result)
{
var isCancellationRequested = source.IsCancellationRequested;
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))
{
// If TryRead/ReadAsync are called and cancellation is requested, we need to make sure memory is allocated for the ReadResult,
// otherwise if someone calls advance afterward on the ReadResult, it will throw.
if (isCancellationRequested)
{
AllocateReadTail();
ClearCancellationToken();
}
result = new ReadResult(
GetCurrentReadOnlySequence(),
isCanceled: isCancellationRequested,
IsCompletedOrThrow());
return true;
}
result = new ReadResult();
return false;
}
private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
{
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}
private void AllocateReadTail()
{
if (_readHead == null)
{
Debug.Assert(_readTail == null);
_readHead = AllocateSegment();
_readTail = _readHead;
}
else if (_readTail.WritableBytes < _minimumReadThreshold)
{
CreateNewTailSegment();
}
}
private void CreateNewTailSegment()
{
BufferSegment nextSegment = AllocateSegment();
_readTail.SetNext(nextSegment);
_readTail = nextSegment;
}
private BufferSegment AllocateSegment()
{
var nextSegment = new BufferSegment();
if (_pool is null)
{
nextSegment.SetMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
}
else
{
nextSegment.SetMemory(_pool.Rent(_bufferSize));
}
return nextSegment;
}
private void Cancel()
{
InternalTokenSource.Cancel();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsCompletedOrThrow()
{
if (!_isStreamCompleted)
{
return false;
}
if (_exceptionInfo != null)
{
ThrowLatchedException();
}
return true;
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowLatchedException()
{
_exceptionInfo.Throw();
}
public void Dispose()
{
Complete();
}
}
}

View File

@ -1,34 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Text;
namespace System.IO.Pipelines
{
public class StreamPipeReaderAdapterOptions
{
public static StreamPipeReaderAdapterOptions DefaultOptions = new StreamPipeReaderAdapterOptions();
public const int DefaultMinimumSegmentSize = 4096;
public const int DefaultMinimumReadThreshold = 256;
public StreamPipeReaderAdapterOptions()
{
}
public StreamPipeReaderAdapterOptions(int minimumSegmentSize, int minimumReadThreshold, MemoryPool<byte> memoryPool)
{
MinimumSegmentSize = minimumSegmentSize;
MinimumReadThreshold = minimumReadThreshold;
MemoryPool = memoryPool;
}
public int MinimumSegmentSize { get; set; } = DefaultMinimumSegmentSize;
public int MinimumReadThreshold { get; set; } = DefaultMinimumReadThreshold;
public MemoryPool<byte> MemoryPool { get; set; } = MemoryPool<byte>.Shared;
}
}

View File

@ -1,347 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.IO.Pipelines
{
/// <summary>
/// Implements PipeWriter using a underlying stream.
/// </summary>
public class StreamPipeWriter : PipeWriter, IDisposable
{
private readonly int _minimumSegmentSize;
private int _bytesWritten;
private List<CompletedBuffer> _completedSegments;
private Memory<byte> _currentSegment;
private object _currentSegmentOwner;
private MemoryPool<byte> _pool;
private int _position;
private CancellationTokenSource _internalTokenSource;
private bool _isCompleted;
private object _lockObject = new object();
private CancellationTokenSource InternalTokenSource
{
get
{
lock (_lockObject)
{
if (_internalTokenSource == null)
{
_internalTokenSource = new CancellationTokenSource();
}
return _internalTokenSource;
}
}
}
/// <summary>
/// Creates a new StreamPipeWrapper
/// </summary>
/// <param name="writingStream">The stream to write to</param>
public StreamPipeWriter(Stream writingStream) : this(writingStream, 4096)
{
}
public StreamPipeWriter(Stream writingStream, int minimumSegmentSize, MemoryPool<byte> pool = null)
{
_minimumSegmentSize = minimumSegmentSize;
InnerStream = writingStream;
_pool = pool == MemoryPool<byte>.Shared ? null : pool;
}
/// <summary>
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream { get; }
/// <inheritdoc />
public override void Advance(int count)
{
if (_currentSegment.IsEmpty) // TODO confirm this
{
throw new InvalidOperationException("No writing operation. Make sure GetMemory() was called.");
}
if (count >= 0)
{
if (_currentSegment.Length < _position + count)
{
throw new InvalidOperationException("Can't advance past buffer size.");
}
_bytesWritten += count;
_position += count;
}
}
/// <inheritdoc />
public override Memory<byte> GetMemory(int sizeHint = 0)
{
if (_isCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(sizeHint));
}
EnsureCapacity(sizeHint);
return _currentSegment.Slice(_position);
}
/// <inheritdoc />
public override Span<byte> GetSpan(int sizeHint = 0)
{
if (_isCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(sizeHint));
}
EnsureCapacity(sizeHint);
return _currentSegment.Span.Slice(_position);
}
/// <inheritdoc />
public override void CancelPendingFlush()
{
Cancel();
}
/// <inheritdoc />
public override void Complete(Exception exception = null)
{
if (_isCompleted)
{
return;
}
Dispose();
// We still want to cleanup segments before throwing an exception.
if (_bytesWritten > 0 && exception == null)
{
ThrowHelper.ThrowInvalidOperationException_DataNotAllFlushed();
}
}
/// <inheritdoc />
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
throw new NotSupportedException("OnReaderCompleted isn't supported in StreamPipeWrapper.");
}
/// <inheritdoc />
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
if (_bytesWritten == 0)
{
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, _isCompleted));
}
return FlushAsyncInternal(cancellationToken);
}
private void Cancel()
{
InternalTokenSource.Cancel();
}
private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancellationToken = default)
{
// Write all completed segments and whatever remains in the current segment
// and flush the result.
var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.Register(state => ((StreamPipeWriter)state).Cancel(), this);
}
using (reg)
{
var localToken = InternalTokenSource.Token;
try
{
if (_completedSegments != null && _completedSegments.Count > 0)
{
var count = _completedSegments.Count;
for (var i = 0; i < count; i++)
{
var segment = _completedSegments[0];
#if NETCOREAPP3_0
await InnerStream.WriteAsync(segment.Buffer.Slice(0, segment.Length), localToken);
#elif NETSTANDARD2_0
MemoryMarshal.TryGetArray<byte>(segment.Buffer, out var arraySegment);
await InnerStream.WriteAsync(arraySegment.Array, 0, segment.Length, localToken);
#else
#error Target frameworks need to be updated.
#endif
_bytesWritten -= segment.Length;
segment.Return();
_completedSegments.RemoveAt(0);
}
}
if (!_currentSegment.IsEmpty)
{
#if NETCOREAPP3_0
await InnerStream.WriteAsync(_currentSegment.Slice(0, _position), localToken);
#elif NETSTANDARD2_0
MemoryMarshal.TryGetArray<byte>(_currentSegment, out var arraySegment);
await InnerStream.WriteAsync(arraySegment.Array, 0, _position, localToken);
#else
#error Target frameworks need to be updated.
#endif
_bytesWritten -= _position;
_position = 0;
}
await InnerStream.FlushAsync(localToken);
return new FlushResult(isCanceled: false, _isCompleted);
}
catch (OperationCanceledException)
{
// Remove the cancellation token such that the next time Flush is called
// A new CTS is created.
lock (_lockObject)
{
_internalTokenSource = null;
}
if (cancellationToken.IsCancellationRequested)
{
throw;
}
// Catch any cancellation and translate it into setting isCanceled = true
return new FlushResult(isCanceled: true, _isCompleted);
}
}
}
private void EnsureCapacity(int sizeHint)
{
// This does the Right Thing. It only subtracts _position from the current segment length if it's non-null.
// If _currentSegment is null, it returns 0.
var remainingSize = _currentSegment.Length - _position;
// If the sizeHint is 0, any capacity will do
// Otherwise, the buffer must have enough space for the entire size hint, or we need to add a segment.
if ((sizeHint == 0 && remainingSize > 0) || (sizeHint > 0 && remainingSize >= sizeHint))
{
// We have capacity in the current segment
return;
}
AddSegment(sizeHint);
}
private void AddSegment(int sizeHint = 0)
{
if (_currentSegment.Length != 0)
{
// We're adding a segment to the list
if (_completedSegments == null)
{
_completedSegments = new List<CompletedBuffer>();
}
// Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when
// GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
// ignore any empty space in it.
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position));
}
if (_pool is null)
{
_currentSegment = ArrayPool<byte>.Shared.Rent(Math.Max(sizeHint, _minimumSegmentSize));
_currentSegmentOwner = _currentSegment;
}
else if (sizeHint <= _pool.MaxBufferSize)
{
// Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
// Also, the size cannot be larger than the MaxBufferSize of the MemoryPool
var owner = _pool.Rent(Math.Clamp(sizeHint, _minimumSegmentSize, _pool.MaxBufferSize));
_currentSegment = owner.Memory;
_currentSegmentOwner = owner;
}
else
{
_currentSegment = new byte[sizeHint];
}
_position = 0;
}
public void Dispose()
{
if (_isCompleted)
{
return;
}
_isCompleted = true;
_internalTokenSource?.Dispose();
if (_completedSegments != null)
{
foreach (var segment in _completedSegments)
{
segment.Return();
}
}
DisposeOwner(_currentSegmentOwner);
}
private static void DisposeOwner(object owner)
{
if (owner is IMemoryOwner<byte> memoryOwner)
{
memoryOwner.Dispose();
}
else if (owner is byte[] array)
{
ArrayPool<byte>.Shared.Return(array);
}
}
/// <summary>
/// Holds a byte[] from the pool and a size value. Basically a Memory but guaranteed to be backed by an ArrayPool byte[], so that we know we can return it.
/// </summary>
private readonly struct CompletedBuffer
{
private readonly object _memoryOwner;
public Memory<byte> Buffer { get; }
public int Length { get; }
public ReadOnlySpan<byte> Span => Buffer.Span;
public CompletedBuffer(object owner, Memory<byte> buffer, int length)
{
_memoryOwner = owner;
Buffer = buffer;
Length = length;
}
public void Return()
{
DisposeOwner(_memoryOwner);
}
}
}
}

View File

@ -1,50 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Runtime.CompilerServices;
namespace System.IO.Pipelines
{
internal static class ThrowHelper
{
public static void ThrowInvalidOperationException_NoReadingAllowed() => throw CreateInvalidOperationException_NoReadingAllowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException("Reading is not allowed after reader was completed.");
public static void ThrowInvalidOperationException_NoArrayFromMemory() => throw CreateInvalidOperationException_NoArrayFromMemory();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoArrayFromMemory() => new InvalidOperationException("Could not get byte[] from Memory.");
public static void ThrowInvalidOperationException_NoDataRead() => throw CreateInvalidOperationException_NoDataRead();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoDataRead() => new InvalidOperationException("No data has been read into the StreamPipeReader.");
public static void ThrowInvalidOperationException_SynchronousReadsDisallowed() => throw CreateInvalidOperationException_SynchronousReadsDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousReadsDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call ReadAsync or set allowSynchronousIO to true instead.");
public static void ThrowInvalidOperationException_SynchronousWritesDisallowed() => throw CreateInvalidOperationException_SynchronousWritesDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousWritesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call WriteAsync or set allowSynchronousIO to true instead.");
public static void ThrowInvalidOperationException_SynchronousFlushesDisallowed() => throw CreateInvalidOperationException_SynchronousFlushesDisallowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_SynchronousFlushesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call FlushAsync or set allowSynchronousIO to true instead.");
public static void ThrowInvalidOperationException_DataNotAllFlushed() => throw CreateInvalidOperationException_DataNotAllFlushed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_DataNotAllFlushed() => new InvalidOperationException("Complete called without flushing the StreamPipeWriter. Call FlushAsync() before calling Complete().");
public static void ThrowInvalidOperationException_NoWritingAllowed() => throw CreateInvalidOperationException_NoWritingAllowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoWritingAllowed() => new InvalidOperationException("Writing is not allowed after writer was completed.");
public static void ThrowArgumentOutOfRangeException(string argument) => throw CreateArgumentOutOfRangeException(argument);
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateArgumentOutOfRangeException(string argument) => new ArgumentOutOfRangeException(argument);
public static void ThrowOperationCanceledException_ReadCanceled() => throw CreateOperationCanceledException_ReadCanceled();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateOperationCanceledException_ReadCanceled() => new OperationCanceledException("Read was canceled on underlying PipeReader.");
}
}

View File

@ -1,169 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Internal;
namespace System.IO.Pipelines
{
/// <summary>
/// Represents a WriteOnlyStream backed by a PipeWriter
/// </summary>
public class WriteOnlyPipeStream : Stream
{
private bool _allowSynchronousIO = true;
/// <summary>
/// Creates a new WriteOnlyStream
/// </summary>
/// <param name="pipeWriter">The PipeWriter to write to.</param>
public WriteOnlyPipeStream(PipeWriter pipeWriter) :
this(pipeWriter, allowSynchronousIO: true)
{
}
/// <summary>
/// Creates a new WriteOnlyStream
/// </summary>
/// <param name="pipeWriter">The PipeWriter to write to.</param>
/// <param name="allowSynchronousIO">Whether synchronous IO is allowed.</param>
public WriteOnlyPipeStream(PipeWriter pipeWriter, bool allowSynchronousIO)
{
InnerPipeWriter = pipeWriter;
_allowSynchronousIO = allowSynchronousIO;
}
/// <inheritdoc />
public override bool CanSeek => false;
/// <inheritdoc />
public override bool CanRead => false;
/// <inheritdoc />
public override bool CanWrite => true;
/// <inheritdoc />
public override long Length => throw new NotSupportedException();
/// <inheritdoc />
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override int ReadTimeout
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public PipeWriter InnerPipeWriter { get; }
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
/// <inheritdoc />
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
/// <inheritdoc />
public override void Flush()
{
if (!_allowSynchronousIO)
{
ThrowHelper.ThrowInvalidOperationException_SynchronousFlushesDisallowed();
}
FlushAsync(default).GetAwaiter().GetResult();
}
/// <inheritdoc />
public override Task FlushAsync(CancellationToken cancellationToken)
{
return InnerPipeWriter.FlushAsync(cancellationToken).GetAsTask();
}
/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
{
if (!_allowSynchronousIO)
{
ThrowHelper.ThrowInvalidOperationException_SynchronousWritesDisallowed();
}
WriteAsync(buffer, offset, count, default).GetAwaiter().GetResult();
}
/// <inheritdoc />
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
/// <inheritdoc />
public override void EndWrite(IAsyncResult asyncResult)
{
((Task<object>)asyncResult).GetAwaiter().GetResult();
}
private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<object>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(null);
}
}, tcs, cancellationToken);
return tcs.Task;
}
/// <inheritdoc />
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}
/// <inheritdoc />
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return new ValueTask(WriteAsyncInternal(source, cancellationToken));
}
private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return InnerPipeWriter.WriteAsync(source, cancellationToken).GetAsTask();
}
}
}

View File

@ -98,7 +98,9 @@ namespace Microsoft.AspNetCore.Http.Features
await pipe.Writer.WriteAsync(formContent);
pipe.Writer.Complete();
context.Request.BodyReader = pipe.Reader;
var mockFeature = new MockRequestBodyPipeFeature();
mockFeature.Reader = pipe.Reader;
context.Features.Set<IRequestBodyPipeFeature>(mockFeature);
IFormFeature formFeature = new FormFeature(context.Request, new FormOptions() { BufferBody = bufferRequest });
context.Features.Set<IFormFeature>(formFeature);
@ -108,16 +110,21 @@ namespace Microsoft.AspNetCore.Http.Features
Assert.Equal("bar", formCollection["foo"]);
Assert.Equal("2", formCollection["baz"]);
// Cached
// Cached
formFeature = context.Features.Get<IFormFeature>();
Assert.NotNull(formFeature);
Assert.NotNull(formFeature.Form);
Assert.Same(formFeature.Form, formCollection);
// Cleanup
// Cleanup
await responseFeature.CompleteAsync();
}
private class MockRequestBodyPipeFeature : IRequestBodyPipeFeature
{
public PipeReader Reader { get; set; }
}
private const string MultipartContentType = "multipart/form-data; boundary=WebKitFormBoundary5pDRpGheQXaM8k3T";
private const string MultipartContentTypeWithSpecialCharacters = "multipart/form-data; boundary=\"WebKitFormBoundary/:5pDRpGheQXaM8k3T\"";

View File

@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
@ -24,78 +23,7 @@ namespace Microsoft.AspNetCore.Http.Features
var pipeBody = feature.Reader;
Assert.True(pipeBody is StreamPipeReader);
Assert.Equal(expectedStream, (pipeBody as StreamPipeReader).InnerStream);
}
[Fact]
public async Task RequestBodyReadCanWorkWithPipe()
{
var expectedString = "abcdef";
var feature = InitializeFeatureWithData(expectedString);
var data = await feature.Reader.ReadAsync();
Assert.Equal(expectedString, GetStringFromReadResult(data));
}
[Fact]
public void RequestBodySetPipeReaderReturnsSameValue()
{
var context = new DefaultHttpContext();
var feature = new RequestBodyPipeFeature(context);
var pipeReader = new Pipe().Reader;
feature.Reader = pipeReader;
Assert.Equal(pipeReader, feature.Reader);
}
[Fact]
public void RequestBodySetPipeReadReturnsUserSetValueAlways()
{
var context = new DefaultHttpContext();
var feature = new RequestBodyPipeFeature(context);
var expectedPipeReader = new Pipe().Reader;
feature.Reader = expectedPipeReader;
// Because the user set the RequestBodyPipe, this will return the user set pipeReader
context.Request.Body = new MemoryStream();
Assert.Equal(expectedPipeReader, feature.Reader);
}
[Fact]
public async Task RequestBodyDoesNotAffectUserSetPipe()
{
var expectedString = "abcdef";
var feature = InitializeFeatureWithData("hahaha");
feature.Reader = await GetPipeReaderWithData(expectedString);
var data = await feature.Reader.ReadAsync();
Assert.Equal(expectedString, GetStringFromReadResult(data));
}
[Fact]
public void RequestBodyGetPipeReaderAfterSettingBodyTwice()
{
var context = new DefaultHttpContext();
context.Request.Body = new MemoryStream();
var feature = new RequestBodyPipeFeature(context);
var pipeBody = feature.Reader;
// Requery the PipeReader after setting the body again.
var expectedStream = new MemoryStream();
context.Request.Body = expectedStream;
pipeBody = feature.Reader;
Assert.True(pipeBody is StreamPipeReader);
Assert.Equal(expectedStream, (pipeBody as StreamPipeReader).InnerStream);
Assert.NotNull(pipeBody);
}
[Fact]
@ -112,23 +40,9 @@ namespace Microsoft.AspNetCore.Http.Features
Assert.Equal(expectedString, GetStringFromReadResult(data));
}
private RequestBodyPipeFeature InitializeFeatureWithData(string input)
{
var context = new DefaultHttpContext();
context.Request.Body = new MemoryStream(Encoding.ASCII.GetBytes(input));
return new RequestBodyPipeFeature(context);
}
private static string GetStringFromReadResult(ReadResult data)
{
return Encoding.ASCII.GetString(data.Buffer.ToArray());
}
private async Task<PipeReader> GetPipeReaderWithData(string input)
{
var pipe = new Pipe();
await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes(input));
return pipe.Reader;
}
}
}

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
using System.IO.Pipelines;
using Xunit;
namespace Microsoft.AspNetCore.Http.Features
@ -20,37 +19,7 @@ namespace Microsoft.AspNetCore.Http.Features
var pipeBody = feature.Writer;
Assert.True(pipeBody is StreamPipeWriter);
Assert.Equal(expectedStream, (pipeBody as StreamPipeWriter).InnerStream);
}
[Fact]
public void ResponseBodySetPipeReaderReturnsSameValue()
{
var context = new DefaultHttpContext();
var feature = new ResponseBodyPipeFeature(context);
var pipeWriter = new Pipe().Writer;
feature.Writer = pipeWriter;
Assert.Equal(pipeWriter, feature.Writer);
}
[Fact]
public void ResponseBodyGetPipeWriterAfterSettingBodyTwice()
{
var context = new DefaultHttpContext();
var expectedStream = new MemoryStream();
context.Response.Body = new MemoryStream();
var feature = new ResponseBodyPipeFeature(context);
var pipeBody = feature.Writer;
context.Response.Body = expectedStream;
pipeBody = feature.Writer;
Assert.True(pipeBody is StreamPipeWriter);
Assert.Equal(expectedStream, (pipeBody as StreamPipeWriter).InnerStream);
Assert.NotNull(pipeBody);
}
}
}

View File

@ -1,30 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class FlushResultCancellationTests : StreamPipeTest
{
[Fact]
public async Task FlushAsyncWithNewCancellationTokenNotAffectedByPrevious()
{
var cancellationTokenSource1 = new CancellationTokenSource();
PipeWriter buffer = Writer.WriteEmpty(10);
await buffer.FlushAsync(cancellationTokenSource1.Token);
cancellationTokenSource1.Cancel();
var cancellationTokenSource2 = new CancellationTokenSource();
buffer = Writer.WriteEmpty(10);
await buffer.FlushAsync(cancellationTokenSource2.Token);
}
}
}

View File

@ -251,36 +251,6 @@ namespace Microsoft.AspNetCore.Http.Internal
Assert.NotNull(bodyPipe);
}
[Fact]
public void BodyReader_CanSet()
{
var pipeReader = new Pipe().Reader;
var context = new DefaultHttpContext();
context.Request.BodyReader = pipeReader;
Assert.Equal(pipeReader, context.Request.BodyReader);
}
[Fact]
public void BodyReader_WrapsStream()
{
var context = new DefaultHttpContext();
var expectedStream = new MemoryStream();
context.Request.Body = expectedStream;
var bodyPipe = context.Request.BodyReader as StreamPipeReader;
Assert.Equal(expectedStream, bodyPipe.InnerStream);
}
[Fact]
public void BodyReader_ThrowsWhenSettingNull()
{
var context = new DefaultHttpContext();
Assert.Throws<ArgumentNullException>(() => context.Request.BodyReader = null);
}
private class CustomRouteValuesFeature : IRouteValuesFeature
{
public RouteValueDictionary RouteValues { get; set; }

View File

@ -73,35 +73,6 @@ namespace Microsoft.AspNetCore.Http.Internal
Assert.NotNull(bodyPipe);
}
[Fact]
public void BodyWriter_CanSet()
{
var response = new DefaultHttpContext();
var pipeWriter = new Pipe().Writer;
response.Response.BodyWriter = pipeWriter;
Assert.Equal(pipeWriter, response.Response.BodyWriter);
}
[Fact]
public void BodyWriter_WrapsStream()
{
var context = new DefaultHttpContext();
var expectedStream = new MemoryStream();
context.Response.Body = expectedStream;
var bodyPipe = context.Response.BodyWriter as StreamPipeWriter;
Assert.Equal(expectedStream, bodyPipe.InnerStream);
}
[Fact]
public void BodyWriter_ThrowsWhenSettingNull()
{
var context = new DefaultHttpContext();
Assert.Throws<ArgumentNullException>(() => context.Response.BodyWriter = null);
}
[Fact]
public async Task ResponseStart_CallsFeatureIfSet()
{

View File

@ -1,85 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Text;
using System.Threading.Tasks;
namespace System.IO.Pipelines.Tests
{
public class PipeStreamTest : IDisposable
{
public Stream ReadingStream { get; set; }
public Stream WritingStream { get; set; }
public Pipe Pipe { get; set; }
public PipeReader Reader => Pipe.Reader;
public PipeWriter Writer => Pipe.Writer;
public PipeStreamTest()
{
Pipe = new Pipe();
ReadingStream = new ReadOnlyPipeStream(Reader);
WritingStream = new WriteOnlyPipeStream(Writer);
}
public void Dispose()
{
Writer.Complete();
Reader.Complete();
}
public async Task WriteStringToStreamAsync(string input)
{
await WritingStream.WriteAsync(Encoding.ASCII.GetBytes(input));
}
public async Task WriteStringToPipeAsync(string input)
{
await Writer.WriteAsync(Encoding.ASCII.GetBytes(input));
}
public async Task WriteByteArrayToPipeAsync(byte[] input)
{
await Writer.WriteAsync(input);
}
public async Task<string> ReadFromPipeAsStringAsync()
{
var readResult = await Reader.ReadAsync();
var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
Reader.AdvanceTo(readResult.Buffer.End);
return result;
}
public async Task<string> ReadFromStreamAsStringAsync()
{
var memory = new Memory<byte>(new byte[4096]);
var readLength = await ReadingStream.ReadAsync(memory);
var result = Encoding.ASCII.GetString(memory.ToArray(), 0, readLength);
return result;
}
public async Task<byte[]> ReadFromPipeAsByteArrayAsync()
{
var readResult = await Reader.ReadAsync();
var result = readResult.Buffer.ToArray();
Reader.AdvanceTo(readResult.Buffer.End);
return result;
}
public Task<byte[]> ReadFromStreamAsByteArrayAsync(int size)
{
return ReadFromStreamAsByteArrayAsync(size, ReadingStream);
}
public async Task<byte[]> ReadFromStreamAsByteArrayAsync(int size, Stream stream)
{
var memory = new Memory<byte>(new byte[size]);
var readLength = await stream.ReadAsync(memory);
return memory.Slice(0, readLength).ToArray();
}
}
}

View File

@ -1,221 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class PipeWriterTests : StreamPipeTest
{
[Theory]
[InlineData(3, -1, 0)]
[InlineData(3, 0, -1)]
[InlineData(3, 0, 4)]
[InlineData(3, 4, 0)]
[InlineData(3, -1, -1)]
[InlineData(3, 4, 4)]
public void ThrowsForInvalidParameters(int arrayLength, int offset, int length)
{
var array = new byte[arrayLength];
for (var i = 0; i < array.Length; i++)
{
array[i] = (byte)(i + 1);
}
Writer.Write(new Span<byte>(array, 0, 0));
Writer.Write(new Span<byte>(array, array.Length, 0));
try
{
Writer.Write(new Span<byte>(array, offset, length));
Assert.True(false);
}
catch (Exception ex)
{
Assert.True(ex is ArgumentOutOfRangeException);
}
Writer.Write(new Span<byte>(array, 0, array.Length));
Assert.Equal(array, Read());
}
[Theory]
[InlineData(0, 3)]
[InlineData(1, 2)]
[InlineData(2, 1)]
[InlineData(1, 1)]
public void CanWriteWithOffsetAndLength(int offset, int length)
{
var array = new byte[] { 1, 2, 3 };
Writer.Write(new Span<byte>(array, offset, length));
Assert.Equal(array.Skip(offset).Take(length).ToArray(), Read());
}
[Fact]
public void CanWriteIntoHeadlessBuffer()
{
Writer.Write(new byte[] { 1, 2, 3 });
Assert.Equal(new byte[] { 1, 2, 3 }, Read());
}
[Fact]
public void CanGetNewMemoryWhenSizeTooLarge()
{
var memory = Writer.GetMemory(0);
var memoryLarge = Writer.GetMemory(10000);
Assert.NotEqual(memory, memoryLarge);
}
[Fact]
public void CanGetSameMemoryWhenNoAdvance()
{
var memory = Writer.GetMemory(0);
var secondMemory = Writer.GetMemory(0);
Assert.Equal(memory, secondMemory);
}
[Theory]
[InlineData(0)]
[InlineData(2048)]
public void GetSpanWithZeroSizeHintReturnsMaxBufferSizeOfPool(int sizeHint)
{
var span = Writer.GetSpan(sizeHint);
Assert.Equal(4096, span.Length);
}
[Fact]
public void CanGetSameSpanWhenNoAdvance()
{
var span = Writer.GetSpan(0);
var secondSpan = Writer.GetSpan(0);
Assert.True(span.SequenceEqual(secondSpan));
}
[Theory]
[InlineData(16, 32, 32)]
[InlineData(16, 16, 16)]
[InlineData(64, 32, 64)]
[InlineData(40, 32, 64)] // memory sizes are powers of 2.
public void CheckMinimumSegmentSizeWithGetMemory(int minimumSegmentSize, int getMemorySize, int expectedSize)
{
var writer = new StreamPipeWriter(new MemoryStream(), minimumSegmentSize);
var memory = writer.GetMemory(getMemorySize);
Assert.Equal(expectedSize, memory.Length);
}
[Fact]
public void CanWriteMultipleTimes()
{
Writer.Write(new byte[] { 1 });
Writer.Write(new byte[] { 2 });
Writer.Write(new byte[] { 3 });
Assert.Equal(new byte[] { 1, 2, 3 }, Read());
}
[Fact]
public void CanWriteOverTheBlockLength()
{
Memory<byte> memory = Writer.GetMemory();
IEnumerable<byte> source = Enumerable.Range(0, memory.Length).Select(i => (byte)i);
byte[] expectedBytes = source.Concat(source).Concat(source).ToArray();
Writer.Write(expectedBytes);
Assert.Equal(expectedBytes, Read());
}
[Fact]
public void EnsureAllocatesSpan()
{
var span = Writer.GetSpan(10);
Assert.True(span.Length >= 10);
// 0 byte Flush would not complete the reader so we complete.
Writer.Complete();
Assert.Equal(new byte[] { }, Read());
}
[Fact]
public void SlicesSpanAndAdvancesAfterWrite()
{
int initialLength = Writer.GetSpan(3).Length;
Writer.Write(new byte[] { 1, 2, 3 });
Span<byte> span = Writer.GetSpan();
Assert.Equal(initialLength - 3, span.Length);
Assert.Equal(new byte[] { 1, 2, 3 }, Read());
}
[Theory]
[InlineData(5)]
[InlineData(50)]
[InlineData(500)]
[InlineData(5000)]
[InlineData(50000)]
public async Task WriteLargeDataBinary(int length)
{
var data = new byte[length];
new Random(length).NextBytes(data);
PipeWriter output = Writer;
output.Write(data);
await output.FlushAsync();
var input = Read();
Assert.Equal(data, input.ToArray());
}
[Fact]
public async Task CanWriteNothingToBuffer()
{
Writer.GetMemory(0);
Writer.Advance(0); // doing nothing, the hard way
await Writer.FlushAsync();
}
[Fact]
public void EmptyWriteDoesNotThrow()
{
Writer.Write(new byte[0]);
}
[Fact]
public void ThrowsOnAdvanceOverMemorySize()
{
Memory<byte> buffer = Writer.GetMemory(1);
var exception = Assert.Throws<InvalidOperationException>(() => Writer.Advance(buffer.Length + 1));
Assert.Equal("Can't advance past buffer size.", exception.Message);
}
[Fact]
public void ThrowsOnAdvanceWithNoMemory()
{
PipeWriter buffer = Writer;
var exception = Assert.Throws<InvalidOperationException>(() => buffer.Advance(1));
Assert.Equal("No writing operation. Make sure GetMemory() was called.", exception.Message);
}
}
}

View File

@ -1,128 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class ReadAsyncCancellationTests : StreamPipeTest
{
[Fact]
public async Task AdvanceShouldResetStateIfReadCanceled()
{
Reader.CancelPendingRead();
var result = await Reader.ReadAsync();
var buffer = result.Buffer;
Reader.AdvanceTo(buffer.End);
Assert.False(result.IsCompleted);
Assert.True(result.IsCanceled);
Assert.True(buffer.IsEmpty);
}
[Fact]
public async Task CancellingBeforeAdvance()
{
Write(Encoding.ASCII.GetBytes("Hello World"));
var result = await Reader.ReadAsync();
var buffer = result.Buffer;
Assert.Equal(11, buffer.Length);
Assert.False(result.IsCanceled);
Assert.True(buffer.IsSingleSegment);
var array = new byte[11];
buffer.First.Span.CopyTo(array);
Assert.Equal("Hello World", Encoding.ASCII.GetString(array));
Reader.CancelPendingRead();
Reader.AdvanceTo(buffer.End);
var awaitable = Reader.ReadAsync();
Assert.True(awaitable.IsCompleted);
result = await awaitable;
Assert.True(result.IsCanceled);
Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start);
}
[Fact]
public async Task ReadAsyncWithNewCancellationTokenNotAffectedByPrevious()
{
Write(new byte[1]);
var cancellationTokenSource1 = new CancellationTokenSource();
var result = await Reader.ReadAsync(cancellationTokenSource1.Token);
Reader.AdvanceTo(result.Buffer.Start);
cancellationTokenSource1.Cancel();
var cancellationTokenSource2 = new CancellationTokenSource();
// Verifying that ReadAsync does not throw
result = await Reader.ReadAsync(cancellationTokenSource2.Token);
Reader.AdvanceTo(result.Buffer.Start);
}
[Fact]
public async Task CancellingPendingReadBeforeReadAsync()
{
Reader.CancelPendingRead();
ReadResult result = await Reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
Reader.AdvanceTo(buffer.End);
Assert.False(result.IsCompleted);
Assert.True(result.IsCanceled);
Assert.True(buffer.IsEmpty);
byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
Write(bytes);
result = await Reader.ReadAsync();
buffer = result.Buffer;
Assert.Equal(11, buffer.Length);
Assert.False(result.IsCanceled);
Assert.True(buffer.IsSingleSegment);
var array = new byte[11];
buffer.First.Span.CopyTo(array);
Assert.Equal("Hello World", Encoding.ASCII.GetString(array));
Reader.AdvanceTo(buffer.Start, buffer.Start);
}
[Fact]
public void ReadAsyncCompletedAfterPreCancellation()
{
Reader.CancelPendingRead();
Write(new byte[] { 1, 2, 3 });
ValueTaskAwaiter<ReadResult> awaitable = Reader.ReadAsync().GetAwaiter();
Assert.True(awaitable.IsCompleted);
ReadResult result = awaitable.GetResult();
Assert.True(result.IsCanceled);
awaitable = Reader.ReadAsync().GetAwaiter();
Assert.True(awaitable.IsCompleted);
Reader.AdvanceTo(awaitable.GetResult().Buffer.End);
}
}
}

View File

@ -1,186 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class ReadOnlyPipeStreamTests : PipeStreamTest
{
[Fact]
public void CanSeekFalse()
{
Assert.False(ReadingStream.CanSeek);
}
[Fact]
public void CanReadTrue()
{
Assert.True(ReadingStream.CanRead);
}
[Fact]
public void CanWriteFalse()
{
Assert.False(ReadingStream.CanWrite);
}
[Fact]
public void LengthThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Length);
}
[Fact]
public void PositionThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Position);
Assert.Throws<NotSupportedException>(() => ReadingStream.Position = 1);
}
[Fact]
public void SeekThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Seek(0, SeekOrigin.Begin));
}
[Fact]
public void SetLengthThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.SetLength(1));
}
[Fact]
public void WriteThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.Write(new byte[1], 0, 1));
}
[Fact]
public async Task WriteAsyncThrows()
{
await Assert.ThrowsAsync<NotSupportedException>(async () => await ReadingStream.WriteAsync(new byte[1], 0, 1));
}
[Fact]
public void ReadTimeoutThrows()
{
Assert.Throws<NotSupportedException>(() => ReadingStream.WriteTimeout = 1);
Assert.Throws<NotSupportedException>(() => ReadingStream.WriteTimeout);
}
[Fact]
public async Task ReadAsyncWorks()
{
var expected = "Hello World!";
await WriteStringToPipeAsync(expected);
Assert.Equal(expected, await ReadFromStreamAsStringAsync());
}
[Fact]
public async Task BasicLargeRead()
{
var expected = new byte[8000];
await WriteByteArrayToPipeAsync(expected);
Assert.Equal(expected, await ReadFromStreamAsByteArrayAsync(8000));
}
[Fact]
public async Task ReadAsyncIsCalledFromCallingRead()
{
var pipeReader = await SetupMockPipeReader();
var stream = new ReadOnlyPipeStream(pipeReader.Object);
stream.Read(new byte[1]);
pipeReader.Verify(m => m.ReadAsync(It.IsAny<CancellationToken>()));
}
[Fact]
public async Task ReadAsyncIsCalledFromCallingReadAsync()
{
var pipeReader = await SetupMockPipeReader();
var stream = new ReadOnlyPipeStream(pipeReader.Object);
await stream.ReadAsync(new byte[1]);
pipeReader.Verify(m => m.ReadAsync(It.IsAny<CancellationToken>()));
}
[Fact]
public async Task ReadAsyncCancellationTokenIsPassedIntoReadAsync()
{
var pipeReader = await SetupMockPipeReader();
var stream = new ReadOnlyPipeStream(pipeReader.Object);
var token = new CancellationToken();
await stream.ReadAsync(new byte[1], token);
pipeReader.Verify(m => m.ReadAsync(token));
}
[Fact]
public async Task CopyToAsyncWorks()
{
const int expectedSize = 8000;
var expected = new byte[expectedSize];
await WriteByteArrayToPipeAsync(expected);
Writer.Complete();
var destStream = new MemoryStream();
await ReadingStream.CopyToAsync(destStream);
Assert.Equal(expectedSize, destStream.Length);
}
[Fact]
public void BlockSyncIOThrows()
{
var readOnlyPipeStream = new ReadOnlyPipeStream(Reader, allowSynchronousIO: false);
Assert.Throws<InvalidOperationException>(() => readOnlyPipeStream.Read(new byte[0], 0, 0));
}
[Fact]
public void InnerPipeReaderReturnsPipeReader()
{
var readOnlyPipeStream = new ReadOnlyPipeStream(Reader, allowSynchronousIO: false);
Assert.Equal(Reader, readOnlyPipeStream.InnerPipeReader);
}
[Fact]
public async Task ThrowsOperationCanceledExceptionIfCancelPendingReadWasCalledOnInnerPipeReader()
{
var readOnlyPipeStream = new ReadOnlyPipeStream(Reader);
var readOperation = readOnlyPipeStream.ReadAsync(new byte[1]);
Assert.False(readOperation.IsCompleted);
Reader.CancelPendingRead();
var ex = await Assert.ThrowsAsync<OperationCanceledException>(async () => await readOperation);
Assert.Equal(ThrowHelper.CreateOperationCanceledException_ReadCanceled().Message, ex.Message);
}
private async Task<Mock<PipeReader>> SetupMockPipeReader()
{
await WriteByteArrayToPipeAsync(new byte[1]);
var pipeReader = new Mock<PipeReader>();
pipeReader
.Setup(m => m.ReadAsync(It.IsAny<CancellationToken>()))
.Returns(new ValueTask<ReadResult>(new ReadResult(new ReadOnlySequence<byte>(new byte[1]), false, false)));
return pipeReader;
}
}
}

View File

@ -1,147 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class ReadingAdaptersInteropTests
{
[Fact]
public async Task CheckBasicReadPipeApi()
{
var pipe = new Pipe();
var readStream = new ReadOnlyPipeStream(pipe.Reader);
var pipeReader = new StreamPipeReader(readStream);
await pipe.Writer.WriteAsync(new byte[10]);
var res = await pipeReader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckNestedPipeApi()
{
var pipe = new Pipe();
var reader = pipe.Reader;
for (var i = 0; i < 3; i++)
{
var readStream = new ReadOnlyPipeStream(reader);
reader = new StreamPipeReader(readStream);
}
await pipe.Writer.WriteAsync(new byte[10]);
var res = await reader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckBasicReadStreamApi()
{
var stream = new MemoryStream();
await stream.WriteAsync(new byte[10]);
stream.Position = 0;
var pipeReader = new StreamPipeReader(stream);
var readOnlyStream = new ReadOnlyPipeStream(pipeReader);
var resSize = await readOnlyStream.ReadAsync(new byte[10]);
Assert.Equal(10, resSize);
}
[Fact]
public async Task CheckNestedStreamApi()
{
var stream = new MemoryStream();
await stream.WriteAsync(new byte[10]);
stream.Position = 0;
Stream readOnlyStream = stream;
for (var i = 0; i < 3; i++)
{
var pipeReader = new StreamPipeReader(readOnlyStream);
readOnlyStream = new ReadOnlyPipeStream(pipeReader);
}
var resSize = await readOnlyStream.ReadAsync(new byte[10]);
Assert.Equal(10, resSize);
}
[Fact]
public async Task ReadsCanBeCanceledViaProvidedCancellationToken()
{
var readOnlyStream = new ReadOnlyPipeStream(new HangingPipeReader());
var pipeReader = new StreamPipeReader(readOnlyStream);
var cts = new CancellationTokenSource(1);
await Task.Delay(1);
await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeReader.ReadAsync(cts.Token));
}
[Fact]
public async Task ReadCanBeCancelledViaCancelPendingReadWhenReadIsAsync()
{
var readOnlyStream = new ReadOnlyPipeStream(new HangingPipeReader());
var pipeReader = new StreamPipeReader(readOnlyStream);
var result = new ReadResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
var readingTask = pipeReader.ReadAsync();
tcs.SetResult(0);
result = await readingTask;
});
await tcs.Task;
pipeReader.CancelPendingRead();
await task;
Assert.True(result.IsCanceled);
}
private class HangingPipeReader : PipeReader
{
public override void AdvanceTo(SequencePosition consumed)
{
throw new NotImplementedException();
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
throw new NotImplementedException();
}
public override void CancelPendingRead()
{
throw new NotImplementedException();
}
public override void Complete(Exception exception = null)
{
throw new NotImplementedException();
}
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
throw new NotImplementedException();
}
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
await Task.Delay(30000, cancellationToken);
return new ReadResult();
}
public override bool TryRead(out ReadResult result)
{
result = new ReadResult();
return false;
}
}
}
}

View File

@ -1,752 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public partial class StreamPipeReaderTests : StreamPipeTest
{
[Fact]
public async Task CanRead()
{
Write(Encoding.ASCII.GetBytes("Hello World"));
var readResult = await Reader.ReadAsync();
var buffer = readResult.Buffer;
Assert.Equal(11, buffer.Length);
Assert.True(buffer.IsSingleSegment);
var array = new byte[11];
buffer.First.Span.CopyTo(array);
Assert.Equal("Hello World", Encoding.ASCII.GetString(array));
Reader.AdvanceTo(buffer.End);
}
[Fact]
public async Task CanReadMultipleTimes()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
var readResult = await Reader.ReadAsync();
Assert.Equal(MinimumSegmentSize, readResult.Buffer.Length);
Assert.True(readResult.Buffer.IsSingleSegment);
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
readResult = await Reader.ReadAsync();
Assert.Equal(MinimumSegmentSize * 2, readResult.Buffer.Length);
Assert.False(readResult.Buffer.IsSingleSegment);
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
readResult = await Reader.ReadAsync();
Assert.Equal(10000, readResult.Buffer.Length);
Assert.False(readResult.Buffer.IsSingleSegment);
Reader.AdvanceTo(readResult.Buffer.End);
}
[Fact]
public async Task ReadWithAdvance()
{
WriteByteArray(9000);
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.End);
readResult = await Reader.ReadAsync();
Assert.Equal(MinimumSegmentSize, readResult.Buffer.Length);
Assert.True(readResult.Buffer.IsSingleSegment);
}
[Fact]
public async Task ReadWithAdvanceDifferentSegmentSize()
{
CreateReader(minimumSegmentSize: 4095);
WriteByteArray(9000);
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.End);
readResult = await Reader.ReadAsync();
Assert.Equal(4095, readResult.Buffer.Length);
Assert.True(readResult.Buffer.IsSingleSegment);
}
[Fact]
public async Task ReadWithAdvanceSmallSegments()
{
CreateReader();
WriteByteArray(128);
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.End);
readResult = await Reader.ReadAsync();
Assert.Equal(16, readResult.Buffer.Length);
Assert.True(readResult.Buffer.IsSingleSegment);
}
[Fact]
public async Task ReadConsumePartialReadAsyncCallsTryRead()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.GetPosition(2048));
// Confirm readResults are the same.
var readResult2 = await Reader.ReadAsync();
var didRead = Reader.TryRead(out var readResult3);
Assert.Equal(readResult2, readResult3);
}
[Fact]
public async Task ReadConsumeEntireTryReadReturnsNothing()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.End);
var didRead = Reader.TryRead(out readResult);
Assert.False(didRead);
}
[Fact]
public async Task ReadExaminePartialReadAsyncDoesNotReturnMoreBytes()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.GetPosition(2048));
var readResult2 = await Reader.ReadAsync();
Assert.Equal(readResult, readResult2);
}
[Fact]
public async Task ReadExamineEntireReadAsyncReturnsNewData()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
var readResult2 = await Reader.ReadAsync();
Assert.NotEqual(readResult, readResult2);
}
[Fact]
public async Task ReadCanBeCancelledViaProvidedCancellationToken()
{
var pipeReader = new StreamPipeReader(new HangingStream());
var cts = new CancellationTokenSource(1);
await Task.Delay(1);
await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeReader.ReadAsync(cts.Token));
}
[Fact]
public async Task ReadCanBeCanceledViaCancelPendingReadWhenReadIsAsync()
{
var pipeReader = new StreamPipeReader(new HangingStream());
var result = new ReadResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
var readingTask = pipeReader.ReadAsync();
tcs.SetResult(0);
result = await readingTask;
});
await tcs.Task;
pipeReader.CancelPendingRead();
await task;
Assert.True(result.IsCanceled);
}
[Fact]
public async Task ReadAsyncReturnsCanceledIfCanceledBeforeRead()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
// Make sure state isn't used from before
for (var i = 0; i < 3; i++)
{
Reader.CancelPendingRead();
var readResultTask = Reader.ReadAsync();
Assert.True(readResultTask.IsCompleted);
var readResult = readResultTask.GetAwaiter().GetResult();
Assert.True(readResult.IsCanceled);
readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.End);
}
}
[Fact]
public async Task ReadAsyncReturnsCanceledInterleaved()
{
Write(new byte[10000]);
// Cancel and Read interleaved to confirm cancellations are independent
for (var i = 0; i < 3; i++)
{
Reader.CancelPendingRead();
var readResultTask = Reader.ReadAsync();
Assert.True(readResultTask.IsCompleted);
var readResult = readResultTask.GetAwaiter().GetResult();
Assert.True(readResult.IsCanceled);
readResult = await Reader.ReadAsync();
Assert.False(readResult.IsCanceled);
}
}
[Fact]
public async Task AdvanceWithEmptySequencePositionNoop()
{
Write(Encoding.ASCII.GetBytes(new string('a', 10000)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start);
var readResult2 = await Reader.ReadAsync();
Assert.Equal(readResult, readResult2);
}
[Fact]
public async Task AdvanceToInvalidCursorThrows()
{
Write(new byte[100]);
var result = await Reader.ReadAsync();
var buffer = result.Buffer;
Reader.AdvanceTo(buffer.End);
Reader.CancelPendingRead();
result = await Reader.ReadAsync();
Assert.Throws<ArgumentOutOfRangeException>(() => Reader.AdvanceTo(buffer.End));
Reader.AdvanceTo(result.Buffer.End);
}
[Fact]
public void AdvanceWithoutReadingWithValidSequencePosition()
{
var sequencePosition = new SequencePosition(new BufferSegment(), 5);
Assert.Throws<InvalidOperationException>(() => Reader.AdvanceTo(sequencePosition));
}
[Fact]
public async Task AdvanceMultipleSegments()
{
CreateReader();
WriteByteArray(128);
var result = await Reader.ReadAsync();
Assert.Equal(16, result.Buffer.Length);
Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
var result2 = await Reader.ReadAsync();
Assert.Equal(32, result2.Buffer.Length);
Reader.AdvanceTo(result.Buffer.End, result2.Buffer.End);
var result3 = await Reader.ReadAsync();
Assert.Equal(32, result3.Buffer.Length);
}
[Fact]
public async Task AdvanceMultipleSegmentsEdgeCase()
{
CreateReader();
WriteByteArray(128);
var result = await Reader.ReadAsync();
Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
result = await Reader.ReadAsync();
Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
var result2 = await Reader.ReadAsync();
Assert.Equal(48, result2.Buffer.Length);
Reader.AdvanceTo(result.Buffer.End, result2.Buffer.End);
var result3 = await Reader.ReadAsync();
Assert.Equal(32, result3.Buffer.Length);
}
[Fact]
public async Task CompleteReaderWithoutAdvanceDoesNotThrow()
{
WriteByteArray(100);
await Reader.ReadAsync();
Reader.Complete();
}
[Fact]
public async Task AdvanceAfterCompleteThrows()
{
WriteByteArray(100);
var buffer = (await Reader.ReadAsync()).Buffer;
Reader.Complete();
var exception = Assert.Throws<InvalidOperationException>(() => Reader.AdvanceTo(buffer.End));
Assert.Equal("Reading is not allowed after reader was completed.", exception.Message);
}
[Fact]
public async Task ReadBetweenBlocks()
{
var blockSize = 16;
CreateReader();
WriteWithoutPosition(Enumerable.Repeat((byte)'a', blockSize - 5).ToArray());
Write(Encoding.ASCII.GetBytes("Hello World"));
// ReadAsync will only return one chunk at a time, so Advance/ReadAsync to get two chunks
var result = await Reader.ReadAsync();
Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
result = await Reader.ReadAsync();
var buffer = result.Buffer;
Assert.False(buffer.IsSingleSegment);
var helloBuffer = buffer.Slice(blockSize - 5);
Assert.False(helloBuffer.IsSingleSegment);
var memory = new List<ReadOnlyMemory<byte>>();
foreach (var m in helloBuffer)
{
memory.Add(m);
}
var spans = memory;
Reader.AdvanceTo(buffer.Start, buffer.Start);
Assert.Equal(2, memory.Count);
var helloBytes = new byte[spans[0].Length];
spans[0].Span.CopyTo(helloBytes);
var worldBytes = new byte[spans[1].Length];
spans[1].Span.CopyTo(worldBytes);
Assert.Equal("Hello", Encoding.ASCII.GetString(helloBytes));
Assert.Equal(" World", Encoding.ASCII.GetString(worldBytes));
}
[Fact]
public async Task ThrowsOnReadAfterCompleteReader()
{
Reader.Complete();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await Reader.ReadAsync());
}
[Fact]
public void TryReadAfterCancelPendingReadReturnsTrue()
{
Reader.CancelPendingRead();
var gotData = Reader.TryRead(out var result);
Assert.True(result.IsCanceled);
Reader.AdvanceTo(result.Buffer.End);
}
[Fact]
public void ReadAsyncWithDataReadyReturnsTaskWithValue()
{
WriteByteArray(20);
var task = Reader.ReadAsync();
Assert.True(IsTaskWithResult(task));
}
[Fact]
public async Task ArrayPoolUsedByDefault()
{
WriteByteArray(20);
var reader = new StreamPipeReader(Stream);
var result = await reader.ReadAsync();
SequenceMarshal.TryGetReadOnlySequenceSegment(
result.Buffer,
out var startSegment,
out var startIndex,
out var endSegment,
out var endIndex);
var start = (BufferSegment)startSegment;
var end = (BufferSegment)endSegment;
Assert.Same(start, end);
Assert.IsType<byte[]>(start.MemoryOwner);
reader.AdvanceTo(result.Buffer.End);
reader.Complete();
}
[Fact]
public void CancelledReadAsyncReturnsTaskWithValue()
{
Reader.CancelPendingRead();
var task = Reader.ReadAsync();
Assert.True(IsTaskWithResult(task));
}
[Fact]
public async Task AdvancePastMinReadSizeReadAsyncReturnsMoreData()
{
CreateReader();
WriteByteArray(32);
var result = await Reader.ReadAsync();
Assert.Equal(16, result.Buffer.Length);
Reader.AdvanceTo(result.Buffer.GetPosition(12), result.Buffer.End);
result = await Reader.ReadAsync();
Assert.Equal(20, result.Buffer.Length);
}
[Fact]
public async Task ExamineEverythingResetsAfterSuccessfulRead()
{
WriteByteArray(10000);
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
var readResult2 = await Reader.ReadAsync();
Reader.AdvanceTo(readResult2.Buffer.GetPosition(2000));
var readResult3 = await Reader.ReadAsync();
Assert.Equal(6192, readResult3.Buffer.Length);
}
[Fact]
public async Task ReadMultipleTimesAdvanceFreesAppropriately()
{
var pool = new TestMemoryPool();
CreateReader(memoryPool: pool);
WriteByteArray(2000);
for (var i = 0; i < 99; i++)
{
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
}
var result = await Reader.ReadAsync();
Reader.AdvanceTo(result.Buffer.End);
Assert.Equal(1, pool.GetRentCount());
}
[Fact]
public async Task AsyncReadWorks()
{
Stream = new AsyncStream();
CreateReader();
WriteByteArray(2000);
for (var i = 0; i < 99; i++)
{
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
}
var result = await Reader.ReadAsync();
Assert.Equal(1600, result.Buffer.Length);
Reader.AdvanceTo(result.Buffer.End);
}
[Fact]
public async Task ConsumePartialBufferWorks()
{
CreateReader();
Write(Encoding.ASCII.GetBytes(new string('a', 8)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.GetPosition(4), readResult.Buffer.End);
Stream.Position = 0;
readResult = await Reader.ReadAsync();
var resultString = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
Assert.Equal(new string('a', 12), resultString);
Reader.AdvanceTo(readResult.Buffer.End);
}
[Fact]
public async Task ConsumePartialBufferBetweenMultipleSegmentsWorks()
{
CreateReader();
Write(Encoding.ASCII.GetBytes(new string('a', 8)));
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.GetPosition(4), readResult.Buffer.End);
Stream.Position = 0;
readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
Stream.Position = 0;
readResult = await Reader.ReadAsync();
var resultString = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
Assert.Equal(new string('a', 20), resultString);
Reader.AdvanceTo(readResult.Buffer.End);
}
[Fact]
public async Task SetMinimumReadThresholdSegmentAdvancesCorrectly()
{
CreateReader(minimumReadThreshold: 8);
WriteByteArray(9);
var readResult = await Reader.ReadAsync();
Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
AppendByteArray(9);
readResult = await Reader.ReadAsync();
foreach (var segment in readResult.Buffer)
{
Assert.Equal(9, segment.Length);
}
Assert.False(readResult.Buffer.IsSingleSegment);
}
[Fact]
public void SetMinimumReadThresholdOfZeroThrows()
{
Assert.Throws<ArgumentOutOfRangeException>(() => new StreamPipeReader(Stream,
new StreamPipeReaderAdapterOptions(minimumSegmentSize: 4096, minimumReadThreshold: 0, new TestMemoryPool())));
}
[Fact]
public void SetOptionsToNullThrows()
{
Assert.Throws<ArgumentNullException>(() => new StreamPipeReader(Stream, null));
}
[Fact]
public async Task UseBothStreamAndPipeToReadConfirmSameSize()
{
Write(new byte[8]);
var buffer = new byte[4];
Stream.Read(buffer, 0, buffer.Length);
var readResult = await Reader.ReadAsync();
Assert.Equal(buffer, readResult.Buffer.ToArray());
}
[Fact]
public async Task UseStreamThenPipeToReadNoBytesLost()
{
CreateReader(minimumSegmentSize: 1, minimumReadThreshold: 1);
var expectedString = WriteString("abcdef");
var accumulatedResult = "";
var buffer = new byte[1];
for (var i = 0; i < expectedString.Length / 2; i++)
{
// Read from stream then pipe to guarantee no bytes are lost.
accumulatedResult += ReadFromStreamAsString(buffer);
accumulatedResult += await ReadFromPipeAsString();
}
Assert.Equal(expectedString, accumulatedResult);
}
[Fact]
public async Task UsePipeThenStreamToReadNoBytesLost()
{
CreateReader(minimumSegmentSize: 1, minimumReadThreshold: 1);
var expectedString = WriteString("abcdef");
var accumulatedResult = "";
var buffer = new byte[1];
for (var i = 0; i < expectedString.Length / 2; i++)
{
// Read from pipe then stream to guarantee no bytes are lost.
accumulatedResult += await ReadFromPipeAsString();
accumulatedResult += ReadFromStreamAsString(buffer);
}
Assert.Equal(expectedString, accumulatedResult);
}
[Fact]
public async Task UseBothStreamAndPipeToReadWithoutAdvance_StreamIgnoresAdvance()
{
var buffer = new byte[1];
CreateReader(minimumSegmentSize: 1, minimumReadThreshold: 1);
WriteString("abc");
ReadFromStreamAsString(buffer);
var readResult = await Reader.ReadAsync();
// No Advance
// Next call to Stream.Read will get the next 4 bytes rather than the bytes already read by the pipe
Assert.Equal("c", ReadFromStreamAsString(buffer));
}
[Fact]
public async Task ReadAsyncWithNoDataCompletesReader()
{
var readResult = await Reader.ReadAsync();
Assert.True(readResult.IsCompleted);
}
[Fact]
public async Task ReadAsyncWithEmptyDataCompletesStream()
{
WriteByteArray(0);
var readResult = await Reader.ReadAsync();
Assert.True(readResult.IsCompleted);
}
[Fact]
public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
{
Stream = new ThrowAfterZeroByteReadStream();
Reader = new StreamPipeReader(Stream);
var readResult = await Reader.ReadAsync();
readResult = await Reader.ReadAsync();
Assert.True(readResult.Buffer.IsEmpty);
Assert.True(readResult.IsCompleted);
}
[Fact]
public void InnerStreamReturnsStream()
{
Assert.Equal(Stream, ((StreamPipeReader)Reader).InnerStream);
}
[Fact]
public async Task BufferingDataPastEndOfStreamCanBeReadAgain()
{
var helloBytes = Encoding.ASCII.GetBytes("Hello World");
Write(helloBytes);
var readResult = await Reader.ReadAsync();
var buffer = readResult.Buffer;
Reader.AdvanceTo(buffer.Start, buffer.End);
// Make sure IsCompleted is true
readResult = await Reader.ReadAsync();
buffer = readResult.Buffer;
Reader.AdvanceTo(buffer.Start, buffer.End);
Assert.True(readResult.IsCompleted);
var value = await ReadFromPipeAsString();
Assert.Equal("Hello World", value);
}
private async Task<string> ReadFromPipeAsString()
{
var readResult = await Reader.ReadAsync();
var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
Reader.AdvanceTo(readResult.Buffer.End);
return result;
}
private string ReadFromStreamAsString(byte[] buffer)
{
var res = Stream.Read(buffer, 0, buffer.Length);
return Encoding.ASCII.GetString(buffer);
}
private string WriteString(string expectedString)
{
Write(Encoding.ASCII.GetBytes(expectedString));
return expectedString;
}
private void CreateReader(int minimumSegmentSize = 16, int minimumReadThreshold = 4, MemoryPool<byte> memoryPool = null)
{
Reader = new StreamPipeReader(Stream,
new StreamPipeReaderAdapterOptions(
minimumSegmentSize,
minimumReadThreshold,
memoryPool ?? new TestMemoryPool()));
}
private bool IsTaskWithResult<T>(ValueTask<T> task)
{
return task == new ValueTask<T>(task.Result);
}
private void WriteByteArray(int size)
{
Write(new byte[size]);
}
private void AppendByteArray(int size)
{
Append(new byte[size]);
}
private class AsyncStream : MemoryStream
{
private static byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await Task.Yield();
return await base.ReadAsync(buffer, offset, count, cancellationToken);
}
// Keeping as this code will eventually be ported to corefx
#if NETCOREAPP3_0
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
await Task.Yield();
return await base.ReadAsync(buffer, cancellationToken);
}
#endif
}
private class ThrowAfterZeroByteReadStream : MemoryStream
{
private bool _throwOnNextCallToRead;
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsync(new Memory<byte>(buffer, offset, count)).AsTask();
}
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
if (_throwOnNextCallToRead)
{
throw new Exception();
}
var bytes = await base.ReadAsync(destination, cancellationToken);
if (bytes == 0)
{
_throwOnNextCallToRead = true;
}
return bytes;
}
}
}
}

View File

@ -1,75 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Text;
namespace System.IO.Pipelines.Tests
{
public abstract class StreamPipeTest : IDisposable
{
protected const int MaximumSizeHigh = 65;
protected const int MinimumSegmentSize = 4096;
public Stream Stream { get; set; }
public PipeWriter Writer { get; set; }
public PipeReader Reader { get; set; }
public TestMemoryPool Pool { get; set; }
protected StreamPipeTest()
{
Pool = new TestMemoryPool();
Stream = new MemoryStream();
Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, Pool);
Reader = new StreamPipeReader(Stream, new StreamPipeReaderAdapterOptions(MinimumSegmentSize, minimumReadThreshold: 256, Pool));
}
public void Dispose()
{
Writer.Complete();
Reader.Complete();
Pool.Dispose();
}
public byte[] Read()
{
Writer.FlushAsync().GetAwaiter().GetResult();
return ReadWithoutFlush();
}
public string ReadAsString()
{
Writer.FlushAsync().GetAwaiter().GetResult();
return Encoding.ASCII.GetString(ReadWithoutFlush());
}
public void Write(byte[] data)
{
Stream.Write(data, 0, data.Length);
Stream.Position = 0;
}
public void WriteWithoutPosition(byte[] data)
{
Stream.Write(data, 0, data.Length);
}
public void Append(byte[] data)
{
var originalPosition = Stream.Position;
Stream.Write(data, 0, data.Length);
Stream.Position = originalPosition;
}
public byte[] ReadWithoutFlush()
{
Stream.Position = 0;
var buffer = new byte[Stream.Length];
var result = Stream.Read(buffer, 0, (int)Stream.Length);
return buffer;
}
}
}

View File

@ -1,556 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class StreamPipeWriterTests : StreamPipeTest
{
[Fact]
public async Task CanWriteAsyncMultipleTimesIntoSameBlock()
{
await Writer.WriteAsync(new byte[] { 1 });
await Writer.WriteAsync(new byte[] { 2 });
await Writer.WriteAsync(new byte[] { 3 });
Assert.Equal(new byte[] { 1, 2, 3 }, Read());
}
[Theory]
[InlineData(100)]
[InlineData(4000)]
public async Task CanAdvanceWithPartialConsumptionOfFirstSegment(int firstWriteLength)
{
Writer = new StreamPipeWriter(Stream, MinimumSegmentSize, new TestMemoryPool(maxBufferSize: 20000));
await Writer.WriteAsync(Encoding.ASCII.GetBytes("a"));
var memory = Writer.GetMemory(firstWriteLength);
Writer.Advance(firstWriteLength);
memory = Writer.GetMemory();
Writer.Advance(memory.Length);
await Writer.FlushAsync();
Assert.Equal(firstWriteLength + memory.Length + 1, Read().Length);
}
[Fact]
public async Task WriteCanBeCancelledViaProvidedCancellationToken()
{
var pipeWriter = new StreamPipeWriter(new HangingStream());
var cts = new CancellationTokenSource(1);
await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeWriter.WriteAsync(Encoding.ASCII.GetBytes("data"), cts.Token));
}
[Fact]
public async Task WriteCanBeCanceledViaCancelPendingFlushWhenFlushIsAsync()
{
var pipeWriter = new StreamPipeWriter(new HangingStream());
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
try
{
var writingTask = pipeWriter.WriteAsync(Encoding.ASCII.GetBytes("data"));
tcs.SetResult(0);
flushResult = await writingTask;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
});
await tcs.Task;
pipeWriter.CancelPendingFlush();
await task;
Assert.True(flushResult.IsCanceled);
}
[Fact]
public void FlushAsyncCompletedAfterPreCancellation()
{
PipeWriter writableBuffer = Writer.WriteEmpty(1);
Writer.CancelPendingFlush();
ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();
Assert.True(flushAsync.IsCompleted);
FlushResult flushResult = flushAsync.GetAwaiter().GetResult();
Assert.True(flushResult.IsCanceled);
flushAsync = writableBuffer.FlushAsync();
Assert.True(flushAsync.IsCompleted);
}
[Fact]
public async Task FlushAsyncReturnsCanceledIfCanceledBeforeFlush()
{
await CheckCanceledFlush();
}
[Fact]
public async Task FlushAsyncReturnsCanceledIfCanceledBeforeFlushMultipleTimes()
{
for (var i = 0; i < 10; i++)
{
await CheckCanceledFlush();
}
}
[Fact]
public async Task FlushAsyncReturnsCanceledInterleaved()
{
for (var i = 0; i < 5; i++)
{
await CheckCanceledFlush();
await CheckWriteIsNotCanceled();
}
}
[Fact]
public async Task CancelPendingFlushBetweenWritesAllDataIsPreserved()
{
Stream = new SingleWriteStream();
Writer = new StreamPipeWriter(Stream);
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
try
{
await Writer.WriteAsync(Encoding.ASCII.GetBytes("data"));
var writingTask = Writer.WriteAsync(Encoding.ASCII.GetBytes(" data"));
tcs.SetResult(0);
flushResult = await writingTask;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
});
await tcs.Task;
Writer.CancelPendingFlush();
await task;
Assert.True(flushResult.IsCanceled);
await Writer.WriteAsync(Encoding.ASCII.GetBytes(" more data"));
Assert.Equal(Encoding.ASCII.GetBytes("data data more data"), Read());
}
[Fact]
public async Task CancelPendingFlushAfterAllWritesAllDataIsPreserved()
{
Stream = new CannotFlushStream();
Writer = new StreamPipeWriter(Stream);
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
try
{
// Create two Segments
// First one will succeed to write, other one will hang.
var writingTask = Writer.WriteAsync(Encoding.ASCII.GetBytes("data"));
tcs.SetResult(0);
flushResult = await writingTask;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
});
await tcs.Task;
Writer.CancelPendingFlush();
await task;
Assert.True(flushResult.IsCanceled);
}
[Fact]
public async Task CancelPendingFlushLostOfCancellationsNoDataLost()
{
var writeSize = 16;
var singleWriteStream = new SingleWriteStream();
Stream = singleWriteStream;
Writer = new StreamPipeWriter(Stream, minimumSegmentSize: writeSize);
for (var i = 0; i < 10; i++)
{
FlushResult flushResult = new FlushResult();
var expectedData = Encoding.ASCII.GetBytes(new string('a', writeSize));
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
try
{
// Create two Segments
// First one will succeed to write, other one will hang.
for (var j = 0; j < 2; j++)
{
Writer.Write(expectedData);
}
var flushTask = Writer.FlushAsync();
tcs.SetResult(0);
flushResult = await flushTask;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
});
await tcs.Task;
Writer.CancelPendingFlush();
await task;
Assert.True(flushResult.IsCanceled);
}
// Only half of the data was written because every other flush failed.
Assert.Equal(16 * 10, ReadWithoutFlush().Length);
// Start allowing all writes to make read succeed.
singleWriteStream.AllowAllWrites = true;
Assert.Equal(16 * 10 * 2, Read().Length);
}
[Fact]
public async Task UseBothStreamAndPipeToWrite()
{
await WriteStringToPipeWriter("a");
WriteStringToStream("c");
Assert.Equal("ac", ReadAsString());
}
[Fact]
public async Task UsePipeThenStreamToWriteMultipleTimes()
{
var expectedString = "abcdef";
for (var i = 0; i < expectedString.Length; i++)
{
if (i % 2 == 0)
{
WriteStringToStream(expectedString[i].ToString());
}
else
{
await WriteStringToPipeWriter(expectedString[i].ToString());
}
}
Assert.Equal(expectedString, ReadAsString());
}
[Fact]
public async Task UseStreamThenPipeToWriteMultipleTimes()
{
var expectedString = "abcdef";
for (var i = 0; i < expectedString.Length; i++)
{
if (i % 2 == 0)
{
await WriteStringToPipeWriter(expectedString[i].ToString());
}
else
{
WriteStringToStream(expectedString[i].ToString());
}
}
Assert.Equal(expectedString, ReadAsString());
}
[Fact]
public void CallCompleteWithoutFlush_ThrowsInvalidOperationException()
{
var memory = Writer.GetMemory();
Writer.Advance(memory.Length);
var ex = Assert.Throws<InvalidOperationException>(() => Writer.Complete());
Assert.Equal(ThrowHelper.CreateInvalidOperationException_DataNotAllFlushed().Message, ex.Message);
}
[Fact]
public void CallCompleteWithoutFlushAndException_DoesNotThrowInvalidOperationException()
{
var memory = Writer.GetMemory();
Writer.Advance(memory.Length);
Writer.Complete(new Exception());
}
[Fact]
public void GetMemorySameAsTheMaxPoolSizeUsesThePool()
{
var memory = Writer.GetMemory(Pool.MaxBufferSize);
Assert.Equal(Pool.MaxBufferSize, memory.Length);
Assert.Equal(1, Pool.GetRentCount());
}
[Fact]
public void GetMemoryBiggerThanPoolSizeAllocatesUnpooledArray()
{
var memory = Writer.GetMemory(Pool.MaxBufferSize + 1);
Assert.Equal(Pool.MaxBufferSize + 1, memory.Length);
Assert.Equal(0, Pool.GetRentCount());
}
[Fact]
public void CallComplete_GetMemoryThrows()
{
Writer.Complete();
Assert.Throws<InvalidOperationException>(() => Writer.GetMemory());
}
[Fact]
public void CallComplete_GetSpanThrows()
{
Writer.Complete();
Assert.Throws<InvalidOperationException>(() => Writer.GetSpan());
}
[Fact]
public void DisposeDoesNotThrowIfUnflushedData()
{
var streamPipeWriter = new StreamPipeWriter(new MemoryStream());
streamPipeWriter.Write(new byte[1]);
streamPipeWriter.Dispose();
}
[Fact]
public void CompleteAfterDisposeDoesNotThrowIfUnflushedData()
{
var streamPipeWriter = new StreamPipeWriter(new MemoryStream());
streamPipeWriter.Write(new byte[1]);
streamPipeWriter.Dispose();
streamPipeWriter.Complete();
}
[Fact]
public void CallGetMemoryWithNegativeSizeHint_ThrowsArgException()
{
Assert.Throws<ArgumentOutOfRangeException>(() => Writer.GetMemory(-1));
}
[Fact]
public void CallGetSpanWithNegativeSizeHint_ThrowsArgException()
{
Assert.Throws<ArgumentOutOfRangeException>(() => Writer.GetSpan(-1));
}
[Fact]
public async Task GetMemorySlicesCorrectly()
{
var expectedString = "abcdef";
var memory = Writer.GetMemory();
Encoding.ASCII.GetBytes("abc").CopyTo(memory);
Writer.Advance(3);
memory = Writer.GetMemory();
Encoding.ASCII.GetBytes("def").CopyTo(memory);
Writer.Advance(3);
await Writer.FlushAsync();
Assert.Equal(expectedString, ReadAsString());
}
[Fact]
public async Task GetSpanSlicesCorrectly()
{
var expectedString = "abcdef";
void NonAsyncMethod()
{
var span = Writer.GetSpan();
Encoding.ASCII.GetBytes("abc").CopyTo(span);
Writer.Advance(3);
span = Writer.GetSpan();
Encoding.ASCII.GetBytes("def").CopyTo(span);
Writer.Advance(3);
}
NonAsyncMethod();
await Writer.FlushAsync();
Assert.Equal(expectedString, ReadAsString());
}
[Fact]
public void InnerStreamReturnsStream()
{
Assert.Equal(Stream, ((StreamPipeWriter)Writer).InnerStream);
}
private void WriteStringToStream(string input)
{
var buffer = Encoding.ASCII.GetBytes(input);
Stream.Write(buffer, 0, buffer.Length);
}
private async Task WriteStringToPipeWriter(string input)
{
await Writer.WriteAsync(Encoding.ASCII.GetBytes(input));
}
private async Task CheckWriteIsNotCanceled()
{
var flushResult = await Writer.WriteAsync(Encoding.ASCII.GetBytes("data"));
Assert.False(flushResult.IsCanceled);
}
private async Task CheckCanceledFlush()
{
PipeWriter writableBuffer = Writer.WriteEmpty(MaximumSizeHigh);
Writer.CancelPendingFlush();
ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();
Assert.True(flushAsync.IsCompleted);
FlushResult flushResult = flushAsync.GetAwaiter().GetResult();
Assert.True(flushResult.IsCanceled);
await writableBuffer.FlushAsync();
}
}
internal class HangingStream : MemoryStream
{
public HangingStream()
{
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await Task.Delay(30000, cancellationToken);
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await Task.Delay(30000, cancellationToken);
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await Task.Delay(30000, cancellationToken);
return 0;
}
// Keeping as this code will eventually be ported to corefx
#if NETCOREAPP3_0
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
await Task.Delay(30000, cancellationToken);
return 0;
}
#endif
}
internal class SingleWriteStream : MemoryStream
{
private bool _shouldNextWriteFail;
public bool AllowAllWrites { get; set; }
// Keeping as this code will eventually be ported to corefx
#if NETCOREAPP3_0
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
try
{
if (_shouldNextWriteFail && !AllowAllWrites)
{
await Task.Delay(30000, cancellationToken);
}
else
{
await base.WriteAsync(source, cancellationToken);
}
}
finally
{
_shouldNextWriteFail = !_shouldNextWriteFail;
}
}
#endif
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
try
{
if (_shouldNextWriteFail && !AllowAllWrites)
{
await Task.Delay(30000, cancellationToken);
}
await base.WriteAsync(buffer, offset, count, cancellationToken);
}
finally
{
_shouldNextWriteFail = !_shouldNextWriteFail;
}
}
}
internal class CannotFlushStream : MemoryStream
{
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await Task.Delay(30000, cancellationToken);
}
}
internal static class TestWriterExtensions
{
public static PipeWriter WriteEmpty(this PipeWriter Writer, int count)
{
Writer.GetSpan(count).Slice(0, count).Fill(0);
Writer.Advance(count);
return Writer;
}
}
}

View File

@ -1,206 +0,0 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
namespace System.IO.Pipelines.Tests
{
public class TestMemoryPool : MemoryPool<byte>
{
private MemoryPool<byte> _pool;
private int _maxBufferSize;
private bool _disposed;
private int _rentCount;
public TestMemoryPool(int maxBufferSize = 4096)
{
_pool = new CustomMemoryPool<byte>();
_maxBufferSize = maxBufferSize;
}
public override IMemoryOwner<byte> Rent(int minBufferSize = -1)
{
CheckDisposed();
_rentCount++;
return new PooledMemory(_pool.Rent(minBufferSize), this);
}
public int GetRentCount()
{
return _rentCount;
}
protected override void Dispose(bool disposing)
{
_disposed = true;
}
public override int MaxBufferSize => _maxBufferSize;
internal void CheckDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(TestMemoryPool));
}
}
private class PooledMemory : MemoryManager<byte>
{
private IMemoryOwner<byte> _owner;
private readonly TestMemoryPool _pool;
private int _referenceCount;
private bool _returned;
private string _leaser;
public PooledMemory(IMemoryOwner<byte> owner, TestMemoryPool pool)
{
_owner = owner;
_pool = pool;
_leaser = Environment.StackTrace;
_referenceCount = 1;
}
~PooledMemory()
{
Debug.Assert(_returned, "Block being garbage collected instead of returned to pool" + Environment.NewLine + _leaser);
}
protected override void Dispose(bool disposing)
{
_pool._rentCount--;
_pool.CheckDisposed();
}
public override MemoryHandle Pin(int elementIndex = 0)
{
_pool.CheckDisposed();
Interlocked.Increment(ref _referenceCount);
if (!MemoryMarshal.TryGetArray(_owner.Memory, out ArraySegment<byte> segment))
{
throw new InvalidOperationException();
}
unsafe
{
try
{
if ((uint)elementIndex > (uint)segment.Count)
{
throw new ArgumentOutOfRangeException(nameof(elementIndex));
}
GCHandle handle = GCHandle.Alloc(segment.Array, GCHandleType.Pinned);
return new MemoryHandle(Unsafe.Add<byte>(((void*)handle.AddrOfPinnedObject()), elementIndex + segment.Offset), handle, this);
}
catch
{
Unpin();
throw;
}
}
}
public override void Unpin()
{
_pool.CheckDisposed();
int newRefCount = Interlocked.Decrement(ref _referenceCount);
if (newRefCount < 0)
throw new InvalidOperationException();
if (newRefCount == 0)
{
_returned = true;
}
}
protected override bool TryGetArray(out ArraySegment<byte> segment)
{
_pool.CheckDisposed();
return MemoryMarshal.TryGetArray(_owner.Memory, out segment);
}
public override Memory<byte> Memory
{
get
{
_pool.CheckDisposed();
return _owner.Memory;
}
}
public override Span<byte> GetSpan()
{
_pool.CheckDisposed();
return _owner.Memory.Span;
}
}
private class CustomMemoryPool<T> : MemoryPool<T>
{
public override int MaxBufferSize => int.MaxValue;
public override IMemoryOwner<T> Rent(int minimumBufferSize = -1)
{
if (minimumBufferSize == -1)
{
minimumBufferSize = 4096;
}
return new ArrayMemoryPoolBuffer(minimumBufferSize);
}
protected override void Dispose(bool disposing)
{
throw new NotImplementedException();
}
private sealed class ArrayMemoryPoolBuffer : IMemoryOwner<T>
{
private T[] _array;
public ArrayMemoryPoolBuffer(int size)
{
_array = new T[size];
}
public Memory<T> Memory
{
get
{
T[] array = _array;
if (array == null)
{
throw new ObjectDisposedException(nameof(array));
}
return new Memory<T>(array);
}
}
public void Dispose()
{
T[] array = _array;
if (array != null)
{
_array = null;
}
}
}
}
}
}

View File

@ -1,206 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class WriteOnlyPipeStreamTests : PipeStreamTest
{
[Fact]
public void CanSeekFalse()
{
Assert.False(WritingStream.CanSeek);
}
[Fact]
public void CanReadFalse()
{
Assert.False(WritingStream.CanRead);
}
[Fact]
public void CanWriteTrue()
{
Assert.True(WritingStream.CanWrite);
}
[Fact]
public void LengthThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Length);
}
[Fact]
public void PositionThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Position);
Assert.Throws<NotSupportedException>(() => WritingStream.Position = 1);
}
[Fact]
public void SeekThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Seek(0, SeekOrigin.Begin));
}
[Fact]
public void SetLengthThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.SetLength(1));
}
[Fact]
public void ReadThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.Read(new byte[1], 0, 1));
}
[Fact]
public async Task ReadAsyncThrows()
{
await Assert.ThrowsAsync<NotSupportedException>(async () => await WritingStream.ReadAsync(new byte[1], 0, 1));
}
[Fact]
public void ReadTimeoutThrows()
{
Assert.Throws<NotSupportedException>(() => WritingStream.ReadTimeout = 1);
Assert.Throws<NotSupportedException>(() => WritingStream.ReadTimeout);
}
[Fact]
public async Task WriteAsyncWithReadOnlyMemoryWorks()
{
var expected = "Hello World!";
await WriteStringToStreamAsync(expected);
Assert.Equal(expected, await ReadFromPipeAsStringAsync());
}
[Fact]
public async Task WriteAsyncWithArrayWorks()
{
var expected = new byte[1];
await WritingStream.WriteAsync(expected, 0, expected.Length);
Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync());
}
[Fact]
public async Task BasicLargeWrite()
{
var expected = new byte[8000];
await WritingStream.WriteAsync(expected);
Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync());
}
[Fact]
public void FlushAsyncIsCalledFromCallingFlush()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
stream.Flush();
pipeWriter.Verify(m => m.FlushAsync(default));
}
[Fact]
public async Task FlushAsyncIsCalledFromCallingFlushAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
await stream.FlushAsync();
pipeWriter.Verify(m => m.FlushAsync(default));
}
[Fact]
public async Task FlushAsyncCancellationTokenIsPassedIntoFlushAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
var token = new CancellationToken();
await stream.FlushAsync(token);
pipeWriter.Verify(m => m.FlushAsync(token));
}
[Fact]
public void WriteAsyncIsCalledFromCallingWrite()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
stream.Write(new byte[1]);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<CancellationToken>()));
}
[Fact]
public async Task WriteAsyncIsCalledFromCallingWriteAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
await stream.WriteAsync(new byte[1]);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<CancellationToken>()));
}
[Fact]
public async Task WriteAsyncCancellationTokenIsPassedIntoWriteAsync()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
var token = new CancellationToken();
await stream.WriteAsync(new byte[1], token);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), token));
}
[Fact]
public void WriteAsyncIsCalledFromBeginWrite()
{
var pipeWriter = new Mock<PipeWriter>();
var stream = new WriteOnlyPipeStream(pipeWriter.Object);
stream.BeginWrite(new byte[1], 0, 1, null, this);
pipeWriter.Verify(m => m.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<CancellationToken>()));
}
[Fact]
public async Task BeginAndEndWriteWork()
{
var expected = new byte[1];
var asyncResult = WritingStream.BeginWrite(expected, 0, 1, null, this);
WritingStream.EndWrite(asyncResult);
Assert.Equal(expected, await ReadFromPipeAsByteArrayAsync());
}
[Fact]
public void BlockSyncIOThrows()
{
var writeOnlyPipeStream = new WriteOnlyPipeStream(Writer, allowSynchronousIO: false);
Assert.Throws<InvalidOperationException>(() => writeOnlyPipeStream.Write(new byte[0], 0, 0));
Assert.Throws<InvalidOperationException>(() => writeOnlyPipeStream.Flush());
}
[Fact]
public void InnerPipeWriterReturnsPipeWriter()
{
var writeOnlyPipeStream = new WriteOnlyPipeStream(Writer, allowSynchronousIO: false);
Assert.Equal(Writer, writeOnlyPipeStream.InnerPipeWriter);
}
}
}

View File

@ -1,155 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class WritingAdaptersInteropTests : PipeStreamTest
{
[Fact]
public async Task CheckBasicWritePipeApi()
{
var pipe = new Pipe();
var writeOnlyStream = new WriteOnlyPipeStream(pipe.Writer);
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
await pipeWriter.WriteAsync(new byte[10]);
var res = await pipe.Reader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckNestedPipeApi()
{
var pipe = new Pipe();
var writer = pipe.Writer;
for (var i = 0; i < 3; i++)
{
var writeOnlyStream = new WriteOnlyPipeStream(writer);
writer = new StreamPipeWriter(writeOnlyStream);
}
await writer.WriteAsync(new byte[10]);
var res = await pipe.Reader.ReadAsync();
Assert.Equal(new byte[10], res.Buffer.ToArray());
}
[Fact]
public async Task CheckBasicWriteStreamApi()
{
var stream = new MemoryStream();
var pipeWriter = new StreamPipeWriter(stream);
var writeOnlyStream = new WriteOnlyPipeStream(pipeWriter);
await writeOnlyStream.WriteAsync(new byte[10]);
stream.Position = 0;
var res = await ReadFromStreamAsByteArrayAsync(10, stream);
Assert.Equal(new byte[10], res);
}
[Fact]
public async Task CheckNestedStreamApi()
{
var stream = new MemoryStream();
Stream writeOnlyStream = stream;
for (var i = 0; i < 3; i++)
{
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
writeOnlyStream = new WriteOnlyPipeStream(pipeWriter);
}
await writeOnlyStream.WriteAsync(new byte[10]);
stream.Position = 0;
var res = await ReadFromStreamAsByteArrayAsync(10, stream);
Assert.Equal(new byte[10], res);
}
[Fact]
public async Task WritesCanBeCanceledViaProvidedCancellationToken()
{
var writeOnlyStream = new WriteOnlyPipeStream(new HangingPipeWriter());
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
var cts = new CancellationTokenSource(1);
await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeWriter.WriteAsync(new byte[1], cts.Token));
}
[Fact]
public async Task WriteCanBeCanceledViaCancelPendingFlushWhenFlushIsAsync()
{
var writeOnlyStream = new WriteOnlyPipeStream(new HangingPipeWriter());
var pipeWriter = new StreamPipeWriter(writeOnlyStream);
FlushResult flushResult = new FlushResult();
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var task = Task.Run(async () =>
{
try
{
var writingTask = pipeWriter.WriteAsync(new byte[1]);
tcs.SetResult(0);
flushResult = await writingTask;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
});
await tcs.Task;
pipeWriter.CancelPendingFlush();
await task;
Assert.True(flushResult.IsCanceled);
}
private class HangingPipeWriter : PipeWriter
{
public override void Advance(int bytes)
{
}
public override void CancelPendingFlush()
{
throw new NotImplementedException();
}
public override void Complete(Exception exception = null)
{
throw new NotImplementedException();
}
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
await Task.Delay(30000, cancellationToken);
return new FlushResult();
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
return new Memory<byte>(new byte[4096]);
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
return new Span<byte>(new byte[4096]);
}
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
throw new NotImplementedException();
}
}
}
}

View File

@ -98,39 +98,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
Stream IHttpRequestFeature.Body
{
get
{
return RequestBody;
}
set
{
RequestBody = value;
var requestPipeReader = new StreamPipeReader(RequestBody, new StreamPipeReaderAdapterOptions(
minimumSegmentSize: _context.MemoryPool.GetMinimumSegmentSize(),
minimumReadThreshold: _context.MemoryPool.GetMinimumAllocSize(),
_context.MemoryPool));
RequestBodyPipeReader = requestPipeReader;
// The StreamPipeWrapper needs to be disposed as it hold onto blocks of memory
if (_wrapperObjectsToDispose == null)
{
_wrapperObjectsToDispose = new List<IDisposable>();
}
_wrapperObjectsToDispose.Add(requestPipeReader);
}
get => RequestBody;
set => RequestBody = value;
}
PipeReader IRequestBodyPipeFeature.Reader
{
get
{
if (!ReferenceEquals(_requestStreamInternal, RequestBody))
{
_requestStreamInternal = RequestBody;
RequestBodyPipeReader = PipeReader.Create(RequestBody);
OnCompleted((self) =>
{
((PipeWriter)self).Complete();
return Task.CompletedTask;
}, RequestBodyPipeReader);
}
return RequestBodyPipeReader;
}
set
{
RequestBodyPipeReader = value;
RequestBody = new ReadOnlyPipeStream(RequestBodyPipeReader);
}
}
bool IHttpRequestTrailersFeature.Available => RequestTrailersAvailable;
@ -241,37 +230,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
}
}
Stream IHttpResponseFeature.Body
{
get => ResponseBody;
set => ResponseBody = value;
}
PipeWriter IResponseBodyPipeFeature.Writer
{
get
{
return ResponsePipeWriter;
}
set
{
ResponsePipeWriter = value;
ResponseBody = new WriteOnlyPipeStream(ResponsePipeWriter);
}
}
Stream IHttpResponseFeature.Body
{
get
{
return ResponseBody;
}
set
{
ResponseBody = value;
var responsePipeWriter = new StreamPipeWriter(ResponseBody, minimumSegmentSize: _context.MemoryPool.GetMinimumSegmentSize(), _context.MemoryPool);
ResponsePipeWriter = responsePipeWriter;
// The StreamPipeWrapper needs to be disposed as it hold onto blocks of memory
if (_wrapperObjectsToDispose == null)
if (!ReferenceEquals(_responseStreamInternal, ResponseBody))
{
_wrapperObjectsToDispose = new List<IDisposable>();
_responseStreamInternal = ResponseBody;
ResponseBodyPipeWriter = PipeWriter.Create(ResponseBody);
OnCompleted((self) =>
{
((PipeReader)self).Complete();
return Task.CompletedTask;
}, ResponseBodyPipeWriter);
}
_wrapperObjectsToDispose.Add(responsePipeWriter);
return ResponseBodyPipeWriter;
}
}

View File

@ -70,8 +70,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
protected string _methodText = null;
private string _scheme = null;
private List<IDisposable> _wrapperObjectsToDispose;
private Stream _requestStreamInternal;
private Stream _responseStreamInternal;
public HttpProtocol(HttpConnectionContext context)
{
@ -245,7 +245,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public IHeaderDictionary ResponseHeaders { get; set; }
public Stream ResponseBody { get; set; }
public PipeWriter ResponsePipeWriter { get; set; }
public PipeWriter ResponseBodyPipeWriter { get; set; }
public CancellationToken RequestAborted
{
@ -317,7 +317,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
bodyControl = new BodyControl(bodyControl: this, this);
}
(RequestBody, ResponseBody, RequestBodyPipeReader, ResponsePipeWriter) = bodyControl.Start(messageBody);
(RequestBody, ResponseBody, RequestBodyPipeReader, ResponseBodyPipeWriter) = bodyControl.Start(messageBody);
_requestStreamInternal = RequestBody;
_responseStreamInternal = ResponseBody;
}
public void StopBodies() => bodyControl.Stop();
@ -400,14 +402,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
localAbortCts?.Dispose();
if (_wrapperObjectsToDispose != null)
{
foreach (var disposable in _wrapperObjectsToDispose)
{
disposable.Dispose();
}
}
Output?.Reset();
_requestHeadersParsed = 0;

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
@ -11,26 +12,45 @@ using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
internal sealed class HttpRequestStream : ReadOnlyPipeStream
internal sealed class HttpRequestStream : Stream
{
private HttpRequestPipeReader _pipeReader;
private readonly IHttpBodyControlFeature _bodyControl;
public HttpRequestStream(IHttpBodyControlFeature bodyControl, HttpRequestPipeReader pipeReader)
: base (pipeReader)
{
_bodyControl = bodyControl;
_pipeReader = pipeReader;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override bool CanSeek => false;
public override bool CanRead => true;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override int WriteTimeout
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination, cancellationToken);
return ReadAsyncWrapper(destination, cancellationToken);
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncWrapper(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
public override int Read(byte[] buffer, int offset, int count)
@ -43,16 +63,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}
private ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin)
{
try
{
return base.ReadAsync(buffer, cancellationToken);
}
catch (ConnectionAbortedException ex)
{
throw new TaskCanceledException("The request was aborted", ex);
}
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Flush()
@ -63,5 +88,144 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
return Task.CompletedTask;
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = ReadAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
/// <inheritdoc />
public override int EndRead(IAsyncResult asyncResult)
{
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
}
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<int>(state);
var task = ReadAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(task2.Result);
}
}, tcs, cancellationToken);
return tcs.Task;
}
private ValueTask<int> ReadAsyncWrapper(Memory<byte> destination, CancellationToken cancellationToken)
{
try
{
return ReadAsyncInternal(destination, cancellationToken);
}
catch (ConnectionAbortedException ex)
{
throw new TaskCanceledException("The request was aborted", ex);
}
}
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
if (result.IsCanceled)
{
throw new OperationCanceledException("The read was canceled");
}
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;
var consumed = readableBuffer.End;
var actual = 0;
try
{
if (readableBufferLength != 0)
{
actual = (int)Math.Min(readableBufferLength, buffer.Length);
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);
return actual;
}
if (result.IsCompleted)
{
return 0;
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
}
}
}
/// <inheritdoc />
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
if (destination == null)
{
throw new ArgumentNullException(nameof(destination));
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize));
}
return CopyToAsyncInternal(destination, cancellationToken);
}
private async Task CopyToAsyncInternal(Stream destination, CancellationToken cancellationToken)
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;
try
{
if (readableBufferLength != 0)
{
foreach (var memory in readableBuffer)
{
await destination.WriteAsync(memory, cancellationToken);
}
}
if (result.IsCompleted)
{
return;
}
}
finally
{
_pipeReader.AdvanceTo(readableBuffer.End);
}
}
}
}
}

View File

@ -2,33 +2,52 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
internal sealed class HttpResponseStream : WriteOnlyPipeStream
internal sealed class HttpResponseStream : Stream
{
private readonly HttpResponsePipeWriter _pipeWriter;
private readonly IHttpBodyControlFeature _bodyControl;
public HttpResponseStream(IHttpBodyControlFeature bodyControl, HttpResponsePipeWriter pipeWriter)
: base(pipeWriter)
{
_bodyControl = bodyControl;
_pipeWriter = pipeWriter;
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!_bodyControl.AllowSynchronousIO)
{
throw new InvalidOperationException(CoreStrings.SynchronousWritesDisallowed);
}
public override bool CanSeek => false;
base.Write(buffer, offset, count);
public override bool CanRead => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override int ReadTimeout
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotSupportedException();
public override void Flush()
{
if (!_bodyControl.AllowSynchronousIO)
@ -36,7 +55,87 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
throw new InvalidOperationException(CoreStrings.SynchronousWritesDisallowed);
}
base.Flush();
FlushAsync(default).GetAwaiter().GetResult();
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return _pipeWriter.FlushAsync(cancellationToken).GetAsTask();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!_bodyControl.AllowSynchronousIO)
{
throw new InvalidOperationException(CoreStrings.SynchronousWritesDisallowed);
}
WriteAsync(buffer, offset, count, default).GetAwaiter().GetResult();
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
var task = WriteAsync(buffer, offset, count, default, state);
if (callback != null)
{
task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
public override void EndWrite(IAsyncResult asyncResult)
{
((Task<object>)asyncResult).GetAwaiter().GetResult();
}
private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<object>)state2;
if (task2.IsCanceled)
{
tcs2.SetCanceled();
}
else if (task2.IsFaulted)
{
tcs2.SetException(task2.Exception);
}
else
{
tcs2.SetResult(null);
}
}, tcs, cancellationToken);
return tcs.Task;
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return new ValueTask(WriteAsyncInternal(source, cancellationToken));
}
private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return _pipeWriter.WriteAsync(source, cancellationToken).GetAsTask();
}
}
}

View File

@ -57,30 +57,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
[Fact]
public async Task PipesAreNotPersistedBySettingStreamPipeWriterAcrossRequests()
{
var responseBodyPersisted = false;
PipeWriter bodyPipe = null;
await using (var server = new TestServer(async context =>
{
if (context.Response.BodyWriter == bodyPipe)
{
responseBodyPersisted = true;
}
bodyPipe = new StreamPipeWriter(new MemoryStream());
context.Response.BodyWriter = bodyPipe;
await context.Response.WriteAsync("hello, world");
}, new TestServiceContext(LoggerFactory)))
{
Assert.Equal(string.Empty, await server.HttpClientSlim.GetStringAsync($"http://localhost:{server.Port}/"));
Assert.Equal(string.Empty, await server.HttpClientSlim.GetStringAsync($"http://localhost:{server.Port}/"));
Assert.False(responseBodyPersisted);
}
}
[Fact]
public async Task PipesAreNotPersistedAcrossRequests()
{

View File

@ -3851,45 +3851,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
[Fact]
public async Task ResponseSetBodyAndPipeBodyIsWrapped()
{
await using (var server = new TestServer(async httpContext =>
{
httpContext.Response.Body = new MemoryStream();
httpContext.Response.BodyWriter = new Pipe().Writer;
Assert.IsType<WriteOnlyPipeStream>(httpContext.Response.Body);
await Task.CompletedTask;
}, new TestServiceContext(LoggerFactory)))
{
using (var connection = server.CreateConnection())
{
await connection.Send(
"GET / HTTP/1.1",
"Host:",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}
}
}
[Fact]
public async Task ResponseSetBodyToSameValueTwiceGetPipeMultipleTimesDifferentObject()
{
await using (var server = new TestServer(async httpContext =>
{
var memoryStream = new MemoryStream();
httpContext.Response.Body = memoryStream;
httpContext.Response.Body = new MemoryStream();
var BodyWriter1 = httpContext.Response.BodyWriter;
httpContext.Response.Body = memoryStream;
httpContext.Response.Body = new MemoryStream();
var BodyWriter2 = httpContext.Response.BodyWriter;
Assert.NotEqual(BodyWriter1, BodyWriter2);
@ -3913,104 +3883,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
[Fact]
public async Task ResponseSetPipeToSameValueTwiceGetBodyMultipleTimesDifferent()
{
await using (var server = new TestServer(async httpContext =>
{
var pipeWriter = new Pipe().Writer;
httpContext.Response.BodyWriter = pipeWriter;
var body1 = httpContext.Response.Body;
httpContext.Response.BodyWriter = pipeWriter;
var body2 = httpContext.Response.Body;
Assert.NotEqual(body1, body2);
await Task.CompletedTask;
}, new TestServiceContext(LoggerFactory)))
{
using (var connection = server.CreateConnection())
{
await connection.Send(
"GET / HTTP/1.1",
"Host:",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}
}
}
[Fact]
public async Task ResponseSetPipeAndBodyWriterIsWrapped()
{
await using (var server = new TestServer(async httpContext =>
{
httpContext.Response.BodyWriter = new Pipe().Writer;
httpContext.Response.Body = new MemoryStream();
Assert.IsType<StreamPipeWriter>(httpContext.Response.BodyWriter);
Assert.IsType<MemoryStream>(httpContext.Response.Body);
await Task.CompletedTask;
}, new TestServiceContext(LoggerFactory)))
{
using (var connection = server.CreateConnection())
{
await connection.Send(
"GET / HTTP/1.1",
"Host:",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}
}
}
[Fact]
public async Task ResponseWriteToBodyWriterAndStreamAllBlocksDisposed()
{
await using (var server = new TestServer(async httpContext =>
{
for (var i = 0; i < 3; i++)
{
httpContext.Response.BodyWriter = new Pipe().Writer;
await httpContext.Response.Body.WriteAsync(new byte[1]);
httpContext.Response.Body = new MemoryStream();
await httpContext.Response.BodyWriter.WriteAsync(new byte[1]);
}
// TestMemoryPool will confirm that all rented blocks have been disposed, meaning dispose was called.
await Task.CompletedTask;
}, new TestServiceContext(LoggerFactory)))
{
using (var connection = server.CreateConnection())
{
await connection.Send(
"GET / HTTP/1.1",
"Host:",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}
}
}
[Fact]
public async Task ResponseStreamWrappingWorks()
{
@ -4026,10 +3898,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
httpContext.Response.Body = oldBody;
// Even though we are restoring the original response body, we will create a
// wrapper rather than restoring the original pipe.
Assert.IsType<StreamPipeWriter>(httpContext.Response.BodyWriter);
}, new TestServiceContext(LoggerFactory)))
{
using (var connection = server.CreateConnection())
@ -4040,50 +3908,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}
}
}
[Fact]
public async Task ResponsePipeWrappingWorks()
{
await using (var server = new TestServer(async httpContext =>
{
var oldPipeWriter = httpContext.Response.BodyWriter;
var pipe = new Pipe();
httpContext.Response.BodyWriter = pipe.Writer;
await httpContext.Response.Body.WriteAsync(new byte[1]);
await httpContext.Response.BodyWriter.WriteAsync(new byte[1]);
var readResult = await pipe.Reader.ReadAsync();
Assert.Equal(2, readResult.Buffer.Length);
httpContext.Response.BodyWriter = oldPipeWriter;
// Even though we are restoring the original response body, we will create a
// wrapper rather than restoring the original pipe.
Assert.IsType<WriteOnlyPipeStream>(httpContext.Response.Body);
}, new TestServiceContext(LoggerFactory)))
{
using (var connection = server.CreateConnection())
{
await connection.Send(
"GET / HTTP/1.1",
"Host:",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
"HTTP/1.1 200 OK",
$"Date: {server.Context.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}
}
}