diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs index e9c66b40b6..dfdc04f485 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs @@ -51,35 +51,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure public ValueTask FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) { - var flushValueTask = _writer.FlushAsync(cancellationToken); - - if (minRate != null) - { - _timeoutControl.BytesWrittenToBuffer(minRate, count); - } - - if (flushValueTask.IsCompletedSuccessfully) - { - return new ValueTask(flushValueTask.Result); - } - // https://github.com/dotnet/corefxlab/issues/1334 // Pipelines don't support multiple awaiters on flush. - // While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes, - // it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case, - // we find previous flush Task which still accounts for any newly committed bytes and await that. lock (_flushLock) { - if (_lastFlushTask == null || _lastFlushTask.IsCompleted) + if (_lastFlushTask != null && !_lastFlushTask.IsCompleted) { - _lastFlushTask = flushValueTask.AsTask(); + _lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken); + return new ValueTask(_lastFlushTask); } return TimeFlushAsync(minRate, count, outputAborter, cancellationToken); } } - private async ValueTask TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) + private ValueTask TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) + { + var pipeFlushTask = _writer.FlushAsync(cancellationToken); + + if (minRate != null) + { + _timeoutControl.BytesWrittenToBuffer(minRate, count); + } + + if (pipeFlushTask.IsCompletedSuccessfully) + { + return new ValueTask(pipeFlushTask.Result); + } + + _lastFlushTask = TimeFlushAsyncAwaited(pipeFlushTask, minRate, count, outputAborter, cancellationToken); + return new ValueTask(_lastFlushTask); + } + + private async Task TimeFlushAsyncAwaited(ValueTask pipeFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) { if (minRate != null) { @@ -88,7 +92,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure try { - return await _lastFlushTask; + return await pipeFlushTask; } catch (OperationCanceledException ex) when (outputAborter != null) { @@ -111,5 +115,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure return default; } + + private async Task AwaitLastFlushAndTimeFlushAsync(Task lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) + { + await lastFlushTask; + return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken); + } } } diff --git a/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs b/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs new file mode 100644 index 0000000000..400d390039 --- /dev/null +++ b/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs @@ -0,0 +1,67 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Moq; +using Xunit; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests +{ + public class TimingPipeFlusherTests + { + [Fact] + public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall() + { + var mockPipeWriter = new Mock(); + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + }; + var pipeWriterFlushCallCount = 0; + + mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() => + { + return new ValueTask(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task); + }); + + var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null); + + var flushTask0 = timingPipeFlusher.FlushAsync(); + var flushTask1 = timingPipeFlusher.FlushAsync(); + var flushTask2 = timingPipeFlusher.FlushAsync(); + + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + Assert.False(flushTask2.IsCompleted); + Assert.Equal(1, pipeWriterFlushCallCount); + + pipeWriterFlushTcsArray[0].SetResult(default); + await flushTask0.AsTask().DefaultTimeout(); + + Assert.True(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + Assert.False(flushTask2.IsCompleted); + Assert.True(pipeWriterFlushCallCount <= 2); + + pipeWriterFlushTcsArray[1].SetResult(default); + await flushTask1.AsTask().DefaultTimeout(); + + Assert.True(flushTask0.IsCompleted); + Assert.True(flushTask1.IsCompleted); + Assert.False(flushTask2.IsCompleted); + Assert.True(pipeWriterFlushCallCount <= 3); + + pipeWriterFlushTcsArray[2].SetResult(default); + await flushTask2.AsTask().DefaultTimeout(); + + Assert.True(flushTask0.IsCompleted); + Assert.True(flushTask1.IsCompleted); + Assert.True(flushTask2.IsCompleted); + Assert.Equal(3, pipeWriterFlushCallCount); + } + } +} diff --git a/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs b/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs index d72e6485b5..e3a4bf3898 100644 --- a/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs +++ b/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs @@ -553,15 +553,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task1Waits.IsFaulted); // following tasks should wait. - var task3Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token); + var task2Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; - // Third task is not completed - Assert.False(task3Canceled.IsCompleted); - Assert.False(task3Canceled.IsCanceled); - Assert.False(task3Canceled.IsFaulted); + // Second task is not completed + Assert.False(task2Canceled.IsCompleted); + Assert.False(task2Canceled.IsCanceled); + Assert.False(task2Canceled.IsFaulted); abortedSource.Cancel(); @@ -571,29 +571,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); } - // First task is completed + // First task is completed Assert.True(task1Waits.IsCompleted); Assert.False(task1Waits.IsCanceled); Assert.False(task1Waits.IsFaulted); - // A final write guarantees that the error is observed by OutputProducer, - // but doesn't return a canceled/faulted task. - var task4Success = outputProducer.WriteDataAsync(fullBuffer); - Assert.True(task4Success.IsCompleted); - Assert.False(task4Success.IsCanceled); - Assert.False(task4Success.IsFaulted); + // Second task is now canceled + await Assert.ThrowsAsync(() => task2Canceled); + Assert.True(task2Canceled.IsCanceled); - // Third task is now canceled - await Assert.ThrowsAsync(() => task3Canceled); - Assert.True(task3Canceled.IsCanceled); + // A final write can still succeed. + var task3Success = outputProducer.WriteDataAsync(fullBuffer); await _mockLibuv.OnPostTask; - // Complete the 4th write + // Complete the 3rd write while (completeQueue.TryDequeue(out var triggerNextCompleted)) { await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); } + + Assert.True(task3Success.IsCompleted); + Assert.False(task3Success.IsCanceled); + Assert.False(task3Success.IsFaulted);; } }); } diff --git a/src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs new file mode 100644 index 0000000000..518a7a4d81 --- /dev/null +++ b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs @@ -0,0 +1,118 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; +using Microsoft.AspNetCore.Testing; + +namespace Microsoft.AspNetCore.Server.Kestrel.Performance +{ + public class Http1LargeWritingBenchmark + { + private TestHttp1Connection _http1Connection; + private DuplexPipe.DuplexPipePair _pair; + private MemoryPool _memoryPool; + private Task _consumeResponseBodyTask; + + // Keep this divisable by 10 so it can be evenly segmented. + private readonly byte[] _writeData = new byte[10 * 1024 * 1024]; + + [GlobalSetup] + public void GlobalSetup() + { + _memoryPool = KestrelMemoryPool.Create(); + _http1Connection = MakeHttp1Connection(); + _consumeResponseBodyTask = ConsumeResponseBody(); + } + + [IterationSetup] + public void Setup() + { + _http1Connection.Reset(); + _http1Connection.RequestHeaders.ContentLength = _writeData.Length; + _http1Connection.FlushAsync().GetAwaiter().GetResult(); + } + + [Benchmark] + public Task WriteAsync() + { + return _http1Connection.ResponseBody.WriteAsync(_writeData, 0, _writeData.Length, default); + } + + [Benchmark] + public Task WriteSegmentsUnawaitedAsync() + { + // Write a 10th the of the data at a time + var segmentSize = _writeData.Length / 10; + + for (int i = 0; i < 9; i++) + { + // Ignore the first nine tasks. + _ = _http1Connection.ResponseBody.WriteAsync(_writeData, i * segmentSize, segmentSize, default); + } + + return _http1Connection.ResponseBody.WriteAsync(_writeData, 9 * segmentSize, segmentSize, default); + } + + private TestHttp1Connection MakeHttp1Connection() + { + var options = new PipeOptions(_memoryPool, useSynchronizationContext: false); + var pair = DuplexPipe.CreateConnectionPair(options, options); + _pair = pair; + + var serviceContext = new ServiceContext + { + DateHeaderValueManager = new DateHeaderValueManager(), + ServerOptions = new KestrelServerOptions(), + Log = new MockTrace(), + HttpParser = new HttpParser() + }; + + var http1Connection = new TestHttp1Connection(new HttpConnectionContext + { + ServiceContext = serviceContext, + ConnectionFeatures = new FeatureCollection(), + MemoryPool = _memoryPool, + TimeoutControl = new TimeoutControl(timeoutHandler: null), + Transport = pair.Transport + }); + + http1Connection.Reset(); + http1Connection.InitializeBodyControl(MessageBody.ZeroContentLengthKeepAlive); + serviceContext.DateHeaderValueManager.OnHeartbeat(DateTimeOffset.UtcNow); + + return http1Connection; + } + + private async Task ConsumeResponseBody() + { + var reader = _pair.Application.Input; + var readResult = await reader.ReadAsync(); + + while (!readResult.IsCompleted) + { + reader.AdvanceTo(readResult.Buffer.End); + readResult = await reader.ReadAsync(); + } + + reader.Complete(); + } + + [GlobalCleanup] + public void Dispose() + { + _pair.Transport.Output.Complete(); + _consumeResponseBodyTask.GetAwaiter().GetResult(); + _memoryPool?.Dispose(); + } + } +} diff --git a/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs index d0c1cf3370..ff6023fd18 100644 --- a/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs +++ b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs @@ -29,6 +29,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance private TestHttp1Connection _http1Connection; private DuplexPipe.DuplexPipePair _pair; private MemoryPool _memoryPool; + private Task _consumeResponseBodyTask; private readonly byte[] _writeData = Encoding.ASCII.GetBytes("Hello, World!"); @@ -37,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance { _memoryPool = KestrelMemoryPool.Create(); _http1Connection = MakeHttp1Connection(); + _consumeResponseBodyTask = ConsumeResponseBody(); } [Params(true, false)] @@ -125,14 +127,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance return http1Connection; } - [IterationCleanup] - public void Cleanup() + private async Task ConsumeResponseBody() { var reader = _pair.Application.Input; - if (reader.TryRead(out var readResult)) + var readResult = await reader.ReadAsync(); + + while (!readResult.IsCompleted) { reader.AdvanceTo(readResult.Buffer.End); + readResult = await reader.ReadAsync(); } + + reader.Complete(); } public enum Startup @@ -145,6 +151,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance [GlobalCleanup] public void Dispose() { + _pair.Transport.Output.Complete(); + _consumeResponseBodyTask.GetAwaiter().GetResult(); _memoryPool?.Dispose(); } }