Convert TestServer to pipes #11598 (#11611)

This commit is contained in:
Chris Ross 2019-06-27 16:15:59 -07:00 committed by GitHub
parent 2508dfcd2b
commit 1c0014c539
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 442 additions and 271 deletions

View File

@ -64,7 +64,6 @@ namespace Microsoft.AspNetCore.TestHost
var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext);
Stream responseBody = null;
var requestContent = request.Content ?? new StreamContent(Stream.Null);
var body = await requestContent.ReadAsStreamAsync();
contextBuilder.Configure(context =>
@ -114,8 +113,6 @@ namespace Microsoft.AspNetCore.TestHost
body.Seek(0, SeekOrigin.Begin);
}
req.Body = new AsyncStreamWrapper(body, () => contextBuilder.AllowSynchronousIO);
responseBody = context.Response.Body;
});
var response = new HttpResponseMessage();
@ -138,7 +135,7 @@ namespace Microsoft.AspNetCore.TestHost
response.ReasonPhrase = httpContext.Features.Get<IHttpResponseFeature>().ReasonPhrase;
response.RequestMessage = request;
response.Content = new StreamContent(responseBody);
response.Content = new StreamContent(httpContext.Response.Body);
foreach (var header in httpContext.Response.Headers)
{

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
@ -16,7 +17,8 @@ namespace Microsoft.AspNetCore.TestHost
private readonly HttpContext _httpContext;
private readonly TaskCompletionSource<HttpContext> _responseTcs = new TaskCompletionSource<HttpContext>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly ResponseStream _responseStream;
private readonly ResponseBodyReaderStream _responseReaderStream;
private readonly ResponseBodyPipeWriter _responsePipeWriter;
private readonly ResponseFeature _responseFeature;
private readonly RequestLifetimeFeature _requestLifetimeFeature = new RequestLifetimeFeature();
private readonly ResponseTrailersFeature _responseTrailersFeature = new ResponseTrailersFeature();
@ -37,14 +39,19 @@ namespace Microsoft.AspNetCore.TestHost
request.Protocol = "HTTP/1.1";
request.Method = HttpMethods.Get;
var pipe = new Pipe();
_responseReaderStream = new ResponseBodyReaderStream(pipe, AbortRequest, () => _responseReadCompleteCallback?.Invoke(_httpContext));
_responsePipeWriter = new ResponseBodyPipeWriter(pipe, ReturnResponseMessageAsync);
_responseFeature.Body = new ResponseBodyWriterStream(_responsePipeWriter, () => AllowSynchronousIO);
_responseFeature.BodySnapshot = _responseFeature.Body;
_responseFeature.BodyWriter = _responsePipeWriter;
_httpContext.Features.Set<IHttpBodyControlFeature>(this);
_httpContext.Features.Set<IHttpResponseFeature>(_responseFeature);
_httpContext.Features.Set<IHttpResponseStartFeature>(_responseFeature);
_httpContext.Features.Set<IHttpRequestLifetimeFeature>(_requestLifetimeFeature);
_httpContext.Features.Set<IHttpResponseTrailersFeature>(_responseTrailersFeature);
_responseStream = new ResponseStream(ReturnResponseMessageAsync, AbortRequest, () => AllowSynchronousIO, () => _responseReadCompleteCallback?.Invoke(_httpContext));
_responseFeature.Body = _responseStream;
_httpContext.Features.Set<IResponseBodyPipeFeature>(_responseFeature);
}
public bool AllowSynchronousIO { get; set; }
@ -119,14 +126,14 @@ namespace Microsoft.AspNetCore.TestHost
{
_requestLifetimeFeature.Abort();
}
_responseStream.CompleteWrites();
_responsePipeWriter.Complete();
}
internal async Task CompleteResponseAsync()
{
_pipelineFinished = true;
await ReturnResponseMessageAsync();
_responseStream.CompleteWrites();
_responsePipeWriter.Complete();
await _responseFeature.FireOnResponseCompletedAsync();
}
@ -155,6 +162,16 @@ namespace Microsoft.AspNetCore.TestHost
{
newFeatures[pair.Key] = pair.Value;
}
var serverResponseFeature = _httpContext.Features.Get<IHttpResponseFeature>();
// The client gets a deep copy of this so they can interact with the body stream independently of the server.
var clientResponseFeature = new HttpResponseFeature()
{
StatusCode = serverResponseFeature.StatusCode,
ReasonPhrase = serverResponseFeature.ReasonPhrase,
Headers = serverResponseFeature.Headers,
Body = _responseReaderStream
};
newFeatures.Set<IHttpResponseFeature>(clientResponseFeature);
_responseTcs.TrySetResult(new DefaultHttpContext(newFeatures));
}
}
@ -162,7 +179,8 @@ namespace Microsoft.AspNetCore.TestHost
internal void Abort(Exception exception)
{
_pipelineFinished = true;
_responseStream.Abort(exception);
_responsePipeWriter.Abort(exception);
_responseReaderStream.Abort(exception);
_responseTcs.TrySetException(exception);
}
}

View File

@ -0,0 +1,107 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.TestHost
{
internal class ResponseBodyPipeWriter : PipeWriter
{
private readonly Func<Task> _onFirstWriteAsync;
private readonly Pipe _pipe;
private bool _firstWrite;
private bool _complete;
internal ResponseBodyPipeWriter(Pipe pipe, Func<Task> onFirstWriteAsync)
{
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
_onFirstWriteAsync = onFirstWriteAsync ?? throw new ArgumentNullException(nameof(onFirstWriteAsync));
_firstWrite = true;
}
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
CheckNotComplete();
await FirstWriteAsync();
return await _pipe.Writer.FlushAsync(cancellationToken);
}
private Task FirstWriteAsync()
{
if (_firstWrite)
{
_firstWrite = false;
return _onFirstWriteAsync();
}
return Task.CompletedTask;
}
internal void Abort(Exception innerException)
{
Contract.Requires(innerException != null);
_complete = true;
_pipe.Writer.Complete(new IOException(string.Empty, innerException));
}
internal void Complete()
{
if (_complete)
{
return;
}
// Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads.
_complete = true;
_pipe.Writer.Complete();
}
private void CheckNotComplete()
{
if (_complete)
{
throw new IOException("The request was aborted or the pipeline has finished.");
}
}
public override void Complete(Exception exception = null)
{
// No-op in the non-error case
if (exception != null)
{
Abort(exception);
}
}
public override void CancelPendingFlush() => _pipe.Writer.CancelPendingFlush();
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
=> _pipe.Writer.OnReaderCompleted(callback, state);
public override void Advance(int bytes)
{
CheckNotComplete();
_pipe.Writer.Advance(bytes);
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
CheckNotComplete();
return _pipe.Writer.GetMemory(sizeHint);
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
CheckNotComplete();
return _pipe.Writer.GetSpan(sizeHint);
}
}
}

View File

@ -0,0 +1,146 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.TestHost
{
/// <summary>
/// The client's view of the response body.
/// </summary>
internal class ResponseBodyReaderStream : Stream
{
private bool _readerComplete;
private bool _aborted;
private Exception _abortException;
private readonly Action _abortRequest;
private readonly Action _readComplete;
private readonly Pipe _pipe;
internal ResponseBodyReaderStream(Pipe pipe, Action abortRequest, Action readComplete)
{
_pipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
_abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest));
_readComplete = readComplete;
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
#region NotSupported
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Flush() => throw new NotSupportedException();
public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException();
// Write with count 0 will still trigger OnFirstWrite
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => throw new NotSupportedException();
#endregion NotSupported
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
VerifyBuffer(buffer, offset, count);
CheckAborted();
if (_readerComplete)
{
return 0;
}
using var registration = cancellationToken.Register(Cancel);
var result = await _pipe.Reader.ReadAsync(cancellationToken);
if (result.Buffer.IsEmpty && result.IsCompleted)
{
_pipe.Reader.Complete();
_readComplete();
_readerComplete = true;
return 0;
}
var readableBuffer = result.Buffer;
var actual = Math.Min(readableBuffer.Length, count);
readableBuffer = readableBuffer.Slice(0, actual);
readableBuffer.CopyTo(new Span<byte>(buffer, offset, count));
_pipe.Reader.AdvanceTo(readableBuffer.End);
return (int)actual;
}
private static void VerifyBuffer(byte[] buffer, int offset, int count)
{
if (buffer == null)
{
throw new ArgumentNullException("buffer");
}
if (offset < 0 || offset > buffer.Length)
{
throw new ArgumentOutOfRangeException("offset", offset, string.Empty);
}
if (count <= 0 || count > buffer.Length - offset)
{
throw new ArgumentOutOfRangeException("count", count, string.Empty);
}
}
internal void Cancel()
{
_aborted = true;
_abortException = new OperationCanceledException();
_pipe.Writer.Complete(_abortException);
}
internal void Abort(Exception innerException)
{
Contract.Requires(innerException != null);
_aborted = true;
_abortException = innerException;
}
private void CheckAborted()
{
if (_aborted)
{
throw new IOException(string.Empty, _abortException);
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_abortRequest();
}
base.Dispose(disposing);
}
}
}

View File

@ -0,0 +1,73 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.TestHost
{
internal class ResponseBodyWriterStream : Stream
{
private readonly ResponseBodyPipeWriter _responseWriter;
private readonly Func<bool> _allowSynchronousIO;
public ResponseBodyWriterStream(ResponseBodyPipeWriter responseWriter, Func<bool> allowSynchronousIO)
{
_responseWriter = responseWriter;
_allowSynchronousIO = allowSynchronousIO;
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Flush()
{
FlushAsync().GetAwaiter().GetResult();
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await _responseWriter.FlushAsync(cancellationToken);
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!_allowSynchronousIO())
{
throw new InvalidOperationException("Synchronous operations are disallowed. Call WriteAsync or set AllowSynchronousIO to true.");
}
// The Pipe Write method requires calling FlushAsync to notify the reader. Call WriteAsync instead.
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _responseWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}
}
}

View File

@ -3,6 +3,7 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
@ -10,7 +11,7 @@ using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.TestHost
{
internal class ResponseFeature : IHttpResponseFeature, IHttpResponseStartFeature
internal class ResponseFeature : IHttpResponseFeature, IHttpResponseStartFeature, IResponseBodyPipeFeature
{
private readonly HeaderDictionary _headers = new HeaderDictionary();
private readonly Action<Exception> _abort;
@ -67,6 +68,30 @@ namespace Microsoft.AspNetCore.TestHost
public Stream Body { get; set; }
internal Stream BodySnapshot { get; set; }
internal PipeWriter BodyWriter { get; set; }
public PipeWriter Writer
{
get
{
if (!ReferenceEquals(BodySnapshot, Body))
{
BodySnapshot = Body;
BodyWriter = PipeWriter.Create(Body);
OnCompleted((self) =>
{
((PipeWriter)self).Complete();
return Task.CompletedTask;
}, BodyWriter);
}
return BodyWriter;
}
}
public bool HasStarted { get; set; }
public void OnStarting(Func<object, Task> callback, object state)

View File

@ -1,259 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.TestHost
{
// This steam accepts writes from the server/app, buffers them internally, and returns the data via Reads
// when requested by the client.
internal class ResponseStream : Stream
{
private bool _complete;
private bool _readerComplete;
private bool _aborted;
private Exception _abortException;
private bool _firstWrite;
private readonly SemaphoreSlim _writeLock;
private readonly Func<Task> _onFirstWriteAsync;
private readonly Action _abortRequest;
private readonly Func<bool> _allowSynchronousIO;
private readonly Action _readComplete;
private readonly Pipe _pipe = new Pipe();
internal ResponseStream(Func<Task> onFirstWriteAsync, Action abortRequest, Func<bool> allowSynchronousIO, Action readComplete)
{
_onFirstWriteAsync = onFirstWriteAsync ?? throw new ArgumentNullException(nameof(onFirstWriteAsync));
_abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest));
_allowSynchronousIO = allowSynchronousIO ?? throw new ArgumentNullException(nameof(allowSynchronousIO));
_readComplete = readComplete;
_firstWrite = true;
_writeLock = new SemaphoreSlim(1, 1);
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
#region NotSupported
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
#endregion NotSupported
public override void Flush()
{
FlushAsync().GetAwaiter().GetResult();
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
CheckNotComplete();
await _writeLock.WaitAsync(cancellationToken);
try
{
await FirstWriteAsync();
await _pipe.Writer.FlushAsync(cancellationToken);
}
finally
{
_writeLock.Release();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
VerifyBuffer(buffer, offset, count, allowEmpty: false);
CheckAborted();
if (_readerComplete)
{
return 0;
}
var registration = cancellationToken.Register(Cancel);
try
{
var result = await _pipe.Reader.ReadAsync(cancellationToken);
if (result.Buffer.IsEmpty && result.IsCompleted)
{
_pipe.Reader.Complete();
_readComplete();
_readerComplete = true;
return 0;
}
var readableBuffer = result.Buffer;
var actual = Math.Min(readableBuffer.Length, count);
readableBuffer = readableBuffer.Slice(0, actual);
readableBuffer.CopyTo(new Span<byte>(buffer, offset, count));
_pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
return (int)actual;
}
finally
{
registration.Dispose();
}
}
// Called under write-lock.
private Task FirstWriteAsync()
{
if (_firstWrite)
{
_firstWrite = false;
return _onFirstWriteAsync();
}
return Task.FromResult(true);
}
// Write with count 0 will still trigger OnFirstWrite
public override void Write(byte[] buffer, int offset, int count)
{
if (!_allowSynchronousIO())
{
throw new InvalidOperationException("Synchronous operations are disallowed. Call WriteAsync or set AllowSynchronousIO to true.");
}
// The Pipe Write method requires calling FlushAsync to notify the reader. Call WriteAsync instead.
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
VerifyBuffer(buffer, offset, count, allowEmpty: true);
CheckNotComplete();
await _writeLock.WaitAsync(cancellationToken);
try
{
await FirstWriteAsync();
await _pipe.Writer.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}
finally
{
_writeLock.Release();
}
}
private static void VerifyBuffer(byte[] buffer, int offset, int count, bool allowEmpty)
{
if (buffer == null)
{
throw new ArgumentNullException("buffer");
}
if (offset < 0 || offset > buffer.Length)
{
throw new ArgumentOutOfRangeException("offset", offset, string.Empty);
}
if (count < 0 || count > buffer.Length - offset
|| (!allowEmpty && count == 0))
{
throw new ArgumentOutOfRangeException("count", count, string.Empty);
}
}
internal void Cancel()
{
_aborted = true;
_abortException = new OperationCanceledException();
_complete = true;
_pipe.Writer.Complete(_abortException);
}
internal void Abort(Exception innerException)
{
Contract.Requires(innerException != null);
_aborted = true;
_abortException = innerException;
_complete = true;
_pipe.Writer.Complete(new IOException(string.Empty, innerException));
}
internal void CompleteWrites()
{
// If HttpClient.Dispose gets called while HttpClient.SetTask...() is called
// there is a chance that this method will be called twice and hang on the lock
// to prevent this we can check if there is already a thread inside the lock
if (_complete)
{
return;
}
// Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads.
_complete = true;
_pipe.Writer.Complete();
}
private void CheckAborted()
{
if (_aborted)
{
throw new IOException(string.Empty, _abortException);
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_abortRequest();
}
base.Dispose(disposing);
}
private void CheckNotComplete()
{
if (_complete)
{
throw new IOException("The request was aborted or the pipeline has finished");
}
}
}
}

View File

@ -0,0 +1,64 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Hosting;
using Xunit;
namespace Microsoft.AspNetCore.TestHost.Tests
{
public class ResponseBodyTests
{
[Fact]
public async Task BodyWriter_GetMemoryAdvance_AutoCompleted()
{
var length = -1;
using var host = await CreateHost(httpContext =>
{
var writer = httpContext.Response.BodyWriter;
length = writer.GetMemory().Length;
writer.Advance(length);
return Task.CompletedTask;
});
var response = await host.GetTestServer().CreateClient().GetAsync("/");
var bytes = await response.Content.ReadAsByteArrayAsync();
Assert.Equal(length, bytes.Length);
}
[Fact]
public async Task BodyWriter_StartAsyncGetMemoryAdvance_AutoCompleted()
{
var length = -1;
using var host = await CreateHost(async httpContext =>
{
await httpContext.Response.StartAsync();
var writer = httpContext.Response.BodyWriter;
length = writer.GetMemory().Length;
writer.Advance(length);
});
var response = await host.GetTestServer().CreateClient().GetAsync("/");
var bytes = await response.Content.ReadAsByteArrayAsync();
Assert.Equal(length, bytes.Length);
}
private Task<IHost> CreateHost(RequestDelegate appDelegate)
{
return new HostBuilder()
.ConfigureWebHost(webBuilder =>
{
webBuilder
.UseTestServer()
.Configure(app =>
{
app.Run(appDelegate);
});
})
.StartAsync();
}
}
}