Convert TestHost ResponseStream to use Pipes.

This commit is contained in:
Chris Ross (ASP.NET) 2018-03-05 15:10:42 -08:00
parent f0949b7ebb
commit b6dc00229d
8 changed files with 112 additions and 206 deletions

View File

@ -16,6 +16,8 @@
<SignAssembly>true</SignAssembly>
<PublicSign Condition="'$(OS)' != 'Windows_NT'">true</PublicSign>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<!-- https://github.com/aspnet/BuildTools/issues/592 -->
<EnableApiCheck>false</EnableApiCheck>
</PropertyGroup>
</Project>

View File

@ -40,6 +40,7 @@
<SerilogExtensionsLoggingPackageVersion>1.4.0</SerilogExtensionsLoggingPackageVersion>
<SerilogSinksFilePackageVersion>3.2.0</SerilogSinksFilePackageVersion>
<SystemDiagnosticsDiagnosticSourcePackageVersion>4.5.0-preview2-26224-02</SystemDiagnosticsDiagnosticSourcePackageVersion>
<SystemIOPipelinesPackageVersion>4.5.0-preview2-26224-02</SystemIOPipelinesPackageVersion>
<SystemReflectionMetadataPackageVersion>1.6.0-preview2-26126-03</SystemReflectionMetadataPackageVersion>
<SystemServiceProcessServiceControllerPackageVersion>4.5.0-preview2-26224-02</SystemServiceProcessServiceControllerPackageVersion>
<XunitPackageVersion>2.3.1</XunitPackageVersion>

View File

@ -90,14 +90,14 @@ namespace Microsoft.AspNetCore.TestHost
{
_requestAbortedSource.Cancel();
}
_responseStream.Complete();
_responseStream.CompleteWrites();
}
internal async Task CompleteResponseAsync()
{
_pipelineFinished = true;
await ReturnResponseMessageAsync();
_responseStream.Complete();
_responseStream.CompleteWrites();
await _responseFeature.FireOnResponseCompletedAsync();
}

View File

@ -18,4 +18,8 @@
<ProjectReference Include="..\Microsoft.AspNetCore.Hosting\Microsoft.AspNetCore.Hosting.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.IO.Pipelines" Version="$(SystemIOPipelinesPackageVersion)" />
</ItemGroup>
</Project>

View File

@ -2,9 +2,11 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
@ -17,38 +19,20 @@ namespace Microsoft.AspNetCore.TestHost
private bool _complete;
private bool _aborted;
private Exception _abortException;
private ConcurrentQueue<byte[]> _bufferedData;
private ArraySegment<byte> _topBuffer;
private SemaphoreSlim _readLock;
private SemaphoreSlim _writeLock;
private TaskCompletionSource<object> _readWaitingForData;
private object _signalReadLock;
private Func<Task> _onFirstWriteAsync;
private bool _firstWrite;
private Action _abortRequest;
private Pipe _pipe = new Pipe();
internal ResponseStream(Func<Task> onFirstWriteAsync, Action abortRequest)
{
if (onFirstWriteAsync == null)
{
throw new ArgumentNullException(nameof(onFirstWriteAsync));
}
if (abortRequest == null)
{
throw new ArgumentNullException(nameof(abortRequest));
}
_onFirstWriteAsync = onFirstWriteAsync;
_onFirstWriteAsync = onFirstWriteAsync ?? throw new ArgumentNullException(nameof(onFirstWriteAsync));
_abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest));
_firstWrite = true;
_abortRequest = abortRequest;
_readLock = new SemaphoreSlim(1, 1);
_writeLock = new SemaphoreSlim(1, 1);
_bufferedData = new ConcurrentQueue<byte[]>();
_readWaitingForData = new TaskCompletionSource<object>();
_signalReadLock = new object();
}
public override bool CanRead
@ -93,126 +77,70 @@ namespace Microsoft.AspNetCore.TestHost
public override void Flush()
{
FlushAsync().GetAwaiter().GetResult();
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
CheckNotComplete();
_writeLock.Wait();
await _writeLock.WaitAsync(cancellationToken);
try
{
FirstWriteAsync().GetAwaiter().GetResult();
await FirstWriteAsync();
await _pipe.Writer.FlushAsync(cancellationToken);
}
finally
{
_writeLock.Release();
}
// TODO: Wait for data to drain?
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
tcs.TrySetCanceled();
return tcs.Task;
}
Flush();
// TODO: Wait for data to drain?
return Task.FromResult<object>(null);
}
public override int Read(byte[] buffer, int offset, int count)
{
VerifyBuffer(buffer, offset, count, allowEmpty: false);
_readLock.Wait();
try
{
int totalRead = 0;
do
{
// Don't drain buffered data when signaling an abort.
CheckAborted();
if (_topBuffer.Count <= 0)
{
byte[] topBuffer = null;
while (!_bufferedData.TryDequeue(out topBuffer))
{
if (_complete)
{
CheckAborted();
// Graceful close
return totalRead;
}
WaitForDataAsync().Wait();
}
_topBuffer = new ArraySegment<byte>(topBuffer);
}
int actualCount = Math.Min(count, _topBuffer.Count);
Buffer.BlockCopy(_topBuffer.Array, _topBuffer.Offset, buffer, offset, actualCount);
_topBuffer = new ArraySegment<byte>(_topBuffer.Array,
_topBuffer.Offset + actualCount,
_topBuffer.Count - actualCount);
totalRead += actualCount;
offset += actualCount;
count -= actualCount;
}
while (count > 0 && (_topBuffer.Count > 0 || _bufferedData.Count > 0));
// Keep reading while there is more data available and we have more space to put it in.
return totalRead;
}
finally
{
_readLock.Release();
}
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
VerifyBuffer(buffer, offset, count, allowEmpty: false);
CancellationTokenRegistration registration = cancellationToken.Register(Abort);
await _readLock.WaitAsync(cancellationToken);
CheckAborted();
var registration = cancellationToken.Register(Cancel);
try
{
int totalRead = 0;
do
// TODO: Usability issue. dotnet/corefx#27732 Flush or zero byte write causes ReadAsync to complete without data so I have to call ReadAsync in a loop.
while (true)
{
// Don't drained buffered data on abort.
CheckAborted();
if (_topBuffer.Count <= 0)
var result = await _pipe.Reader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
if (!readableBuffer.IsEmpty)
{
byte[] topBuffer = null;
while (!_bufferedData.TryDequeue(out topBuffer))
{
if (_complete)
{
CheckAborted();
// Graceful close
return totalRead;
}
await WaitForDataAsync();
}
_topBuffer = new ArraySegment<byte>(topBuffer);
var actual = Math.Min(readableBuffer.Length, count);
readableBuffer = readableBuffer.Slice(0, actual);
readableBuffer.CopyTo(new Span<byte>(buffer, offset, count));
_pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
return (int)actual;
}
int actualCount = Math.Min(count, _topBuffer.Count);
Buffer.BlockCopy(_topBuffer.Array, _topBuffer.Offset, buffer, offset, actualCount);
_topBuffer = new ArraySegment<byte>(_topBuffer.Array,
_topBuffer.Offset + actualCount,
_topBuffer.Count - actualCount);
totalRead += actualCount;
offset += actualCount;
count -= actualCount;
if (result.IsCompleted)
{
_pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End); // TODO: Remove after https://github.com/dotnet/corefx/pull/27596
_pipe.Reader.Complete();
return 0;
}
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(!result.IsCanceled); // It should only be canceled by cancellationToken.
// Try again. TODO: dotnet/corefx#27732 I shouldn't need to do this, there wasn't any data.
_pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
while (count > 0 && (_topBuffer.Count > 0 || _bufferedData.Count > 0));
// Keep reading while there is more data available and we have more space to put it in.
return totalRead;
}
finally
{
registration.Dispose();
_readLock.Release();
}
}
@ -229,24 +157,21 @@ namespace Microsoft.AspNetCore.TestHost
// Write with count 0 will still trigger OnFirstWrite
public override void Write(byte[] buffer, int offset, int count)
{
// The Pipe Write method requires calling FlushAsync to notify the reader. Call WriteAsync instead.
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
VerifyBuffer(buffer, offset, count, allowEmpty: true);
CheckNotComplete();
_writeLock.Wait();
await _writeLock.WaitAsync(cancellationToken);
try
{
FirstWriteAsync().GetAwaiter().GetResult();
if (count == 0)
{
return;
}
// Copies are necessary because we don't know what the caller is going to do with the buffer afterwards.
byte[] internalBuffer = new byte[count];
Buffer.BlockCopy(buffer, offset, internalBuffer, 0, count);
_bufferedData.Enqueue(internalBuffer);
SignalDataAvailable();
await FirstWriteAsync();
await _pipe.Writer.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}
finally
{
@ -254,37 +179,6 @@ namespace Microsoft.AspNetCore.TestHost
}
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
Write(buffer, offset, count);
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(state);
tcs.TrySetResult(null);
IAsyncResult result = tcs.Task;
if (callback != null)
{
callback(result);
}
return result;
}
public override void EndWrite(IAsyncResult asyncResult)
{
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
VerifyBuffer(buffer, offset, count, allowEmpty: true);
if (cancellationToken.IsCancellationRequested)
{
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
tcs.TrySetCanceled();
return tcs.Task;
}
Write(buffer, offset, count);
return Task.FromResult<object>(null);
}
private static void VerifyBuffer(byte[] buffer, int offset, int count, bool allowEmpty)
{
if (buffer == null)
@ -302,32 +196,12 @@ namespace Microsoft.AspNetCore.TestHost
}
}
private void SignalDataAvailable()
internal void Cancel()
{
// Dispatch, as TrySetResult will synchronously execute the waiters callback and block our Write.
Task.Factory.StartNew(() => _readWaitingForData.TrySetResult(null));
}
private Task WaitForDataAsync()
{
// Prevent race with Dispose
lock (_signalReadLock)
{
_readWaitingForData = new TaskCompletionSource<object>();
if (!_bufferedData.IsEmpty || _complete)
{
// Race, data could have arrived before we created the TCS.
_readWaitingForData.TrySetResult(null);
}
return _readWaitingForData.Task;
}
}
internal void Abort()
{
Abort(new OperationCanceledException());
_aborted = true;
_abortException = new OperationCanceledException();
_complete = true;
_pipe.Writer.Complete(_abortException);
}
internal void Abort(Exception innerException)
@ -335,10 +209,11 @@ namespace Microsoft.AspNetCore.TestHost
Contract.Requires(innerException != null);
_aborted = true;
_abortException = innerException;
Complete();
_complete = true;
_pipe.Writer.Complete(new IOException(string.Empty, innerException));
}
internal void Complete()
internal void CompleteWrites()
{
// If HttpClient.Dispose gets called while HttpClient.SetTask...() is called
// there is a chance that this method will be called twice and hang on the lock
@ -348,13 +223,9 @@ namespace Microsoft.AspNetCore.TestHost
return;
}
// Prevent race with WaitForDataAsync
lock (_signalReadLock)
{
// Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads.
_complete = true;
_readWaitingForData.TrySetResult(null);
}
// Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads.
_complete = true;
_pipe.Writer.Complete();
}
private void CheckAborted()

View File

@ -210,8 +210,7 @@ namespace Microsoft.AspNetCore.TestHost
Task<int> readTask = responseStream.ReadAsync(new byte[100], 0, 100);
Assert.False(readTask.IsCompleted);
responseStream.Dispose();
Thread.Sleep(50);
Assert.True(readTask.IsCompleted);
Assert.True(readTask.Wait(TimeSpan.FromSeconds(10)), "Finished");
Assert.Equal(0, readTask.Result);
block.Set();
}
@ -234,11 +233,10 @@ namespace Microsoft.AspNetCore.TestHost
Stream responseStream = await response.Content.ReadAsStreamAsync();
CancellationTokenSource cts = new CancellationTokenSource();
Task<int> readTask = responseStream.ReadAsync(new byte[100], 0, 100, cts.Token);
Assert.False(readTask.IsCompleted);
Assert.False(readTask.IsCompleted, "Not Completed");
cts.Cancel();
Thread.Sleep(50);
Assert.True(readTask.IsCompleted);
Assert.True(readTask.IsFaulted);
var ex = Assert.Throws<AggregateException>(() => readTask.Wait(TimeSpan.FromSeconds(10)));
Assert.IsAssignableFrom<OperationCanceledException>(ex.GetBaseException());
block.Set();
}

View File

@ -3,6 +3,7 @@
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
@ -99,7 +100,34 @@ namespace Microsoft.AspNetCore.TestHost
}
[Fact]
public async Task HeadersAvailableBeforeBodyFinished()
public async Task HeadersAvailableBeforeSyncBodyFinished()
{
var block = new ManualResetEvent(false);
var builder = new WebHostBuilder().Configure(app =>
{
app.Run(c =>
{
c.Response.Headers["TestHeader"] = "TestValue";
var bytes = Encoding.UTF8.GetBytes("BodyStarted" + Environment.NewLine);
c.Response.Body.Write(bytes, 0, bytes.Length);
Assert.True(block.WaitOne(TimeSpan.FromSeconds(5)));
bytes = Encoding.UTF8.GetBytes("BodyFinished");
c.Response.Body.Write(bytes, 0, bytes.Length);
return Task.CompletedTask;
});
});
var server = new TestServer(builder);
var context = await server.SendAsync(c => { });
Assert.Equal("TestValue", context.Response.Headers["TestHeader"]);
var reader = new StreamReader(context.Response.Body);
Assert.Equal("BodyStarted", reader.ReadLine());
block.Set();
Assert.Equal("BodyFinished", reader.ReadToEnd());
}
[Fact]
public async Task HeadersAvailableBeforeAsyncBodyFinished()
{
var block = new ManualResetEvent(false);
var builder = new WebHostBuilder().Configure(app =>
@ -107,8 +135,8 @@ namespace Microsoft.AspNetCore.TestHost
app.Run(async c =>
{
c.Response.Headers["TestHeader"] = "TestValue";
await c.Response.WriteAsync("BodyStarted,");
block.WaitOne();
await c.Response.WriteAsync("BodyStarted" + Environment.NewLine);
Assert.True(block.WaitOne(TimeSpan.FromSeconds(5)));
await c.Response.WriteAsync("BodyFinished");
});
});
@ -116,8 +144,10 @@ namespace Microsoft.AspNetCore.TestHost
var context = await server.SendAsync(c => { });
Assert.Equal("TestValue", context.Response.Headers["TestHeader"]);
var reader = new StreamReader(context.Response.Body);
Assert.Equal("BodyStarted", await reader.ReadLineAsync());
block.Set();
Assert.Equal("BodyStarted,BodyFinished", new StreamReader(context.Response.Body).ReadToEnd());
Assert.Equal("BodyFinished", await reader.ReadToEndAsync());
}
[Fact]
@ -217,7 +247,7 @@ namespace Microsoft.AspNetCore.TestHost
Assert.False(readTask.IsCompleted);
cts.Cancel();
var ex = Assert.Throws<AggregateException>(() => readTask.Wait(TimeSpan.FromSeconds(10)));
Assert.IsAssignableFrom<OperationCanceledException>(ex.GetBaseException().InnerException);
Assert.IsAssignableFrom<OperationCanceledException>(ex.GetBaseException());
block.Set();
}

View File

@ -246,7 +246,7 @@ namespace Microsoft.AspNetCore.TestHost
var receiveArray = new byte[1024];
while (true)
{
var receiveResult = await websocket.ReceiveAsync(new System.ArraySegment<byte>(receiveArray), CancellationToken.None);
var receiveResult = await websocket.ReceiveAsync(new ArraySegment<byte>(receiveArray), CancellationToken.None);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
await websocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal Closure", CancellationToken.None);