diff --git a/Directory.Build.props b/Directory.Build.props index 37ebc0356f..fbb0c081f2 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -16,6 +16,8 @@ true true true + + false diff --git a/build/dependencies.props b/build/dependencies.props index 6268a57dd7..544fc9100f 100644 --- a/build/dependencies.props +++ b/build/dependencies.props @@ -40,6 +40,7 @@ 1.4.0 3.2.0 4.5.0-preview2-26224-02 + 4.5.0-preview2-26224-02 1.6.0-preview2-26126-03 4.5.0-preview2-26224-02 2.3.1 diff --git a/src/Microsoft.AspNetCore.TestHost/HttpContextBuilder.cs b/src/Microsoft.AspNetCore.TestHost/HttpContextBuilder.cs index 4617be5308..6886b1aac4 100644 --- a/src/Microsoft.AspNetCore.TestHost/HttpContextBuilder.cs +++ b/src/Microsoft.AspNetCore.TestHost/HttpContextBuilder.cs @@ -90,14 +90,14 @@ namespace Microsoft.AspNetCore.TestHost { _requestAbortedSource.Cancel(); } - _responseStream.Complete(); + _responseStream.CompleteWrites(); } internal async Task CompleteResponseAsync() { _pipelineFinished = true; await ReturnResponseMessageAsync(); - _responseStream.Complete(); + _responseStream.CompleteWrites(); await _responseFeature.FireOnResponseCompletedAsync(); } diff --git a/src/Microsoft.AspNetCore.TestHost/Microsoft.AspNetCore.TestHost.csproj b/src/Microsoft.AspNetCore.TestHost/Microsoft.AspNetCore.TestHost.csproj index a2fdc53e9c..92a0795d05 100644 --- a/src/Microsoft.AspNetCore.TestHost/Microsoft.AspNetCore.TestHost.csproj +++ b/src/Microsoft.AspNetCore.TestHost/Microsoft.AspNetCore.TestHost.csproj @@ -18,4 +18,8 @@ + + + + diff --git a/src/Microsoft.AspNetCore.TestHost/ResponseStream.cs b/src/Microsoft.AspNetCore.TestHost/ResponseStream.cs index 9471e5e872..0cd3459a80 100644 --- a/src/Microsoft.AspNetCore.TestHost/ResponseStream.cs +++ b/src/Microsoft.AspNetCore.TestHost/ResponseStream.cs @@ -2,9 +2,11 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Concurrent; +using System.Buffers; +using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; +using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; @@ -17,38 +19,20 @@ namespace Microsoft.AspNetCore.TestHost private bool _complete; private bool _aborted; private Exception _abortException; - private ConcurrentQueue _bufferedData; - private ArraySegment _topBuffer; - private SemaphoreSlim _readLock; private SemaphoreSlim _writeLock; - private TaskCompletionSource _readWaitingForData; - private object _signalReadLock; private Func _onFirstWriteAsync; private bool _firstWrite; private Action _abortRequest; + private Pipe _pipe = new Pipe(); + internal ResponseStream(Func onFirstWriteAsync, Action abortRequest) { - if (onFirstWriteAsync == null) - { - throw new ArgumentNullException(nameof(onFirstWriteAsync)); - } - - if (abortRequest == null) - { - throw new ArgumentNullException(nameof(abortRequest)); - } - - _onFirstWriteAsync = onFirstWriteAsync; + _onFirstWriteAsync = onFirstWriteAsync ?? throw new ArgumentNullException(nameof(onFirstWriteAsync)); + _abortRequest = abortRequest ?? throw new ArgumentNullException(nameof(abortRequest)); _firstWrite = true; - _abortRequest = abortRequest; - - _readLock = new SemaphoreSlim(1, 1); _writeLock = new SemaphoreSlim(1, 1); - _bufferedData = new ConcurrentQueue(); - _readWaitingForData = new TaskCompletionSource(); - _signalReadLock = new object(); } public override bool CanRead @@ -93,126 +77,70 @@ namespace Microsoft.AspNetCore.TestHost public override void Flush() { + FlushAsync().GetAwaiter().GetResult(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); CheckNotComplete(); - _writeLock.Wait(); + await _writeLock.WaitAsync(cancellationToken); try { - FirstWriteAsync().GetAwaiter().GetResult(); + await FirstWriteAsync(); + await _pipe.Writer.FlushAsync(cancellationToken); } finally { _writeLock.Release(); } - - // TODO: Wait for data to drain? - } - - public override Task FlushAsync(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - { - TaskCompletionSource tcs = new TaskCompletionSource(); - tcs.TrySetCanceled(); - return tcs.Task; - } - - Flush(); - - // TODO: Wait for data to drain? - - return Task.FromResult(null); } public override int Read(byte[] buffer, int offset, int count) { - VerifyBuffer(buffer, offset, count, allowEmpty: false); - _readLock.Wait(); - try - { - int totalRead = 0; - do - { - // Don't drain buffered data when signaling an abort. - CheckAborted(); - if (_topBuffer.Count <= 0) - { - byte[] topBuffer = null; - while (!_bufferedData.TryDequeue(out topBuffer)) - { - if (_complete) - { - CheckAborted(); - // Graceful close - return totalRead; - } - WaitForDataAsync().Wait(); - } - _topBuffer = new ArraySegment(topBuffer); - } - int actualCount = Math.Min(count, _topBuffer.Count); - Buffer.BlockCopy(_topBuffer.Array, _topBuffer.Offset, buffer, offset, actualCount); - _topBuffer = new ArraySegment(_topBuffer.Array, - _topBuffer.Offset + actualCount, - _topBuffer.Count - actualCount); - totalRead += actualCount; - offset += actualCount; - count -= actualCount; - } - while (count > 0 && (_topBuffer.Count > 0 || _bufferedData.Count > 0)); - // Keep reading while there is more data available and we have more space to put it in. - return totalRead; - } - finally - { - _readLock.Release(); - } + return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); } public async override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { VerifyBuffer(buffer, offset, count, allowEmpty: false); - CancellationTokenRegistration registration = cancellationToken.Register(Abort); - await _readLock.WaitAsync(cancellationToken); + CheckAborted(); + var registration = cancellationToken.Register(Cancel); try { - int totalRead = 0; - do + // TODO: Usability issue. dotnet/corefx#27732 Flush or zero byte write causes ReadAsync to complete without data so I have to call ReadAsync in a loop. + while (true) { - // Don't drained buffered data on abort. - CheckAborted(); - if (_topBuffer.Count <= 0) + var result = await _pipe.Reader.ReadAsync(cancellationToken); + + var readableBuffer = result.Buffer; + if (!readableBuffer.IsEmpty) { - byte[] topBuffer = null; - while (!_bufferedData.TryDequeue(out topBuffer)) - { - if (_complete) - { - CheckAborted(); - // Graceful close - return totalRead; - } - await WaitForDataAsync(); - } - _topBuffer = new ArraySegment(topBuffer); + var actual = Math.Min(readableBuffer.Length, count); + readableBuffer = readableBuffer.Slice(0, actual); + readableBuffer.CopyTo(new Span(buffer, offset, count)); + _pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End); + return (int)actual; } - int actualCount = Math.Min(count, _topBuffer.Count); - Buffer.BlockCopy(_topBuffer.Array, _topBuffer.Offset, buffer, offset, actualCount); - _topBuffer = new ArraySegment(_topBuffer.Array, - _topBuffer.Offset + actualCount, - _topBuffer.Count - actualCount); - totalRead += actualCount; - offset += actualCount; - count -= actualCount; + + if (result.IsCompleted) + { + _pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End); // TODO: Remove after https://github.com/dotnet/corefx/pull/27596 + _pipe.Reader.Complete(); + return 0; + } + + cancellationToken.ThrowIfCancellationRequested(); + Debug.Assert(!result.IsCanceled); // It should only be canceled by cancellationToken. + + // Try again. TODO: dotnet/corefx#27732 I shouldn't need to do this, there wasn't any data. + _pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End); } - while (count > 0 && (_topBuffer.Count > 0 || _bufferedData.Count > 0)); - // Keep reading while there is more data available and we have more space to put it in. - return totalRead; } finally { registration.Dispose(); - _readLock.Release(); } } @@ -229,24 +157,21 @@ namespace Microsoft.AspNetCore.TestHost // Write with count 0 will still trigger OnFirstWrite public override void Write(byte[] buffer, int offset, int count) + { + // The Pipe Write method requires calling FlushAsync to notify the reader. Call WriteAsync instead. + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { VerifyBuffer(buffer, offset, count, allowEmpty: true); CheckNotComplete(); - _writeLock.Wait(); + await _writeLock.WaitAsync(cancellationToken); try { - FirstWriteAsync().GetAwaiter().GetResult(); - if (count == 0) - { - return; - } - // Copies are necessary because we don't know what the caller is going to do with the buffer afterwards. - byte[] internalBuffer = new byte[count]; - Buffer.BlockCopy(buffer, offset, internalBuffer, 0, count); - _bufferedData.Enqueue(internalBuffer); - - SignalDataAvailable(); + await FirstWriteAsync(); + await _pipe.Writer.WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken); } finally { @@ -254,37 +179,6 @@ namespace Microsoft.AspNetCore.TestHost } } - public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - Write(buffer, offset, count); - TaskCompletionSource tcs = new TaskCompletionSource(state); - tcs.TrySetResult(null); - IAsyncResult result = tcs.Task; - if (callback != null) - { - callback(result); - } - return result; - } - - public override void EndWrite(IAsyncResult asyncResult) - { - } - - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - VerifyBuffer(buffer, offset, count, allowEmpty: true); - if (cancellationToken.IsCancellationRequested) - { - TaskCompletionSource tcs = new TaskCompletionSource(); - tcs.TrySetCanceled(); - return tcs.Task; - } - - Write(buffer, offset, count); - return Task.FromResult(null); - } - private static void VerifyBuffer(byte[] buffer, int offset, int count, bool allowEmpty) { if (buffer == null) @@ -302,32 +196,12 @@ namespace Microsoft.AspNetCore.TestHost } } - private void SignalDataAvailable() + internal void Cancel() { - // Dispatch, as TrySetResult will synchronously execute the waiters callback and block our Write. - Task.Factory.StartNew(() => _readWaitingForData.TrySetResult(null)); - } - - private Task WaitForDataAsync() - { - // Prevent race with Dispose - lock (_signalReadLock) - { - _readWaitingForData = new TaskCompletionSource(); - - if (!_bufferedData.IsEmpty || _complete) - { - // Race, data could have arrived before we created the TCS. - _readWaitingForData.TrySetResult(null); - } - - return _readWaitingForData.Task; - } - } - - internal void Abort() - { - Abort(new OperationCanceledException()); + _aborted = true; + _abortException = new OperationCanceledException(); + _complete = true; + _pipe.Writer.Complete(_abortException); } internal void Abort(Exception innerException) @@ -335,10 +209,11 @@ namespace Microsoft.AspNetCore.TestHost Contract.Requires(innerException != null); _aborted = true; _abortException = innerException; - Complete(); + _complete = true; + _pipe.Writer.Complete(new IOException(string.Empty, innerException)); } - internal void Complete() + internal void CompleteWrites() { // If HttpClient.Dispose gets called while HttpClient.SetTask...() is called // there is a chance that this method will be called twice and hang on the lock @@ -348,13 +223,9 @@ namespace Microsoft.AspNetCore.TestHost return; } - // Prevent race with WaitForDataAsync - lock (_signalReadLock) - { - // Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads. - _complete = true; - _readWaitingForData.TrySetResult(null); - } + // Throw for further writes, but not reads. Allow reads to drain the buffered data and then return 0 for further reads. + _complete = true; + _pipe.Writer.Complete(); } private void CheckAborted() diff --git a/test/Microsoft.AspNetCore.TestHost.Tests/ClientHandlerTests.cs b/test/Microsoft.AspNetCore.TestHost.Tests/ClientHandlerTests.cs index 75250206d0..73f1c86d29 100644 --- a/test/Microsoft.AspNetCore.TestHost.Tests/ClientHandlerTests.cs +++ b/test/Microsoft.AspNetCore.TestHost.Tests/ClientHandlerTests.cs @@ -210,8 +210,7 @@ namespace Microsoft.AspNetCore.TestHost Task readTask = responseStream.ReadAsync(new byte[100], 0, 100); Assert.False(readTask.IsCompleted); responseStream.Dispose(); - Thread.Sleep(50); - Assert.True(readTask.IsCompleted); + Assert.True(readTask.Wait(TimeSpan.FromSeconds(10)), "Finished"); Assert.Equal(0, readTask.Result); block.Set(); } @@ -234,11 +233,10 @@ namespace Microsoft.AspNetCore.TestHost Stream responseStream = await response.Content.ReadAsStreamAsync(); CancellationTokenSource cts = new CancellationTokenSource(); Task readTask = responseStream.ReadAsync(new byte[100], 0, 100, cts.Token); - Assert.False(readTask.IsCompleted); + Assert.False(readTask.IsCompleted, "Not Completed"); cts.Cancel(); - Thread.Sleep(50); - Assert.True(readTask.IsCompleted); - Assert.True(readTask.IsFaulted); + var ex = Assert.Throws(() => readTask.Wait(TimeSpan.FromSeconds(10))); + Assert.IsAssignableFrom(ex.GetBaseException()); block.Set(); } diff --git a/test/Microsoft.AspNetCore.TestHost.Tests/HttpContextBuilderTests.cs b/test/Microsoft.AspNetCore.TestHost.Tests/HttpContextBuilderTests.cs index e7744ef449..21539c8988 100644 --- a/test/Microsoft.AspNetCore.TestHost.Tests/HttpContextBuilderTests.cs +++ b/test/Microsoft.AspNetCore.TestHost.Tests/HttpContextBuilderTests.cs @@ -3,6 +3,7 @@ using System; using System.IO; +using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; @@ -99,7 +100,34 @@ namespace Microsoft.AspNetCore.TestHost } [Fact] - public async Task HeadersAvailableBeforeBodyFinished() + public async Task HeadersAvailableBeforeSyncBodyFinished() + { + var block = new ManualResetEvent(false); + var builder = new WebHostBuilder().Configure(app => + { + app.Run(c => + { + c.Response.Headers["TestHeader"] = "TestValue"; + var bytes = Encoding.UTF8.GetBytes("BodyStarted" + Environment.NewLine); + c.Response.Body.Write(bytes, 0, bytes.Length); + Assert.True(block.WaitOne(TimeSpan.FromSeconds(5))); + bytes = Encoding.UTF8.GetBytes("BodyFinished"); + c.Response.Body.Write(bytes, 0, bytes.Length); + return Task.CompletedTask; + }); + }); + var server = new TestServer(builder); + var context = await server.SendAsync(c => { }); + + Assert.Equal("TestValue", context.Response.Headers["TestHeader"]); + var reader = new StreamReader(context.Response.Body); + Assert.Equal("BodyStarted", reader.ReadLine()); + block.Set(); + Assert.Equal("BodyFinished", reader.ReadToEnd()); + } + + [Fact] + public async Task HeadersAvailableBeforeAsyncBodyFinished() { var block = new ManualResetEvent(false); var builder = new WebHostBuilder().Configure(app => @@ -107,8 +135,8 @@ namespace Microsoft.AspNetCore.TestHost app.Run(async c => { c.Response.Headers["TestHeader"] = "TestValue"; - await c.Response.WriteAsync("BodyStarted,"); - block.WaitOne(); + await c.Response.WriteAsync("BodyStarted" + Environment.NewLine); + Assert.True(block.WaitOne(TimeSpan.FromSeconds(5))); await c.Response.WriteAsync("BodyFinished"); }); }); @@ -116,8 +144,10 @@ namespace Microsoft.AspNetCore.TestHost var context = await server.SendAsync(c => { }); Assert.Equal("TestValue", context.Response.Headers["TestHeader"]); + var reader = new StreamReader(context.Response.Body); + Assert.Equal("BodyStarted", await reader.ReadLineAsync()); block.Set(); - Assert.Equal("BodyStarted,BodyFinished", new StreamReader(context.Response.Body).ReadToEnd()); + Assert.Equal("BodyFinished", await reader.ReadToEndAsync()); } [Fact] @@ -217,7 +247,7 @@ namespace Microsoft.AspNetCore.TestHost Assert.False(readTask.IsCompleted); cts.Cancel(); var ex = Assert.Throws(() => readTask.Wait(TimeSpan.FromSeconds(10))); - Assert.IsAssignableFrom(ex.GetBaseException().InnerException); + Assert.IsAssignableFrom(ex.GetBaseException()); block.Set(); } diff --git a/test/Microsoft.AspNetCore.TestHost.Tests/TestClientTests.cs b/test/Microsoft.AspNetCore.TestHost.Tests/TestClientTests.cs index 6f1d9098bd..3101c2965f 100644 --- a/test/Microsoft.AspNetCore.TestHost.Tests/TestClientTests.cs +++ b/test/Microsoft.AspNetCore.TestHost.Tests/TestClientTests.cs @@ -246,7 +246,7 @@ namespace Microsoft.AspNetCore.TestHost var receiveArray = new byte[1024]; while (true) { - var receiveResult = await websocket.ReceiveAsync(new System.ArraySegment(receiveArray), CancellationToken.None); + var receiveResult = await websocket.ReceiveAsync(new ArraySegment(receiveArray), CancellationToken.None); if (receiveResult.MessageType == WebSocketMessageType.Close) { await websocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal Closure", CancellationToken.None);