From bff1d0e6c4cf04b0c6c0f3c6f749eb6e206247d5 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 15 May 2019 11:30:48 -0700 Subject: [PATCH] Work around potential race in PipeWriter (#10165) - Make sure we always await the last flush task before calling FlushAsync again instead of preemptively calling FlushAsync and checking to see if the ValueTask is incomplete before bothering to acquire the _flushLock - This now acquires the _flushLock fore every call to Response.Body.Write whereas this only happened for truly async writes before. - I don't think this is a big concern since this should normally be uncontested, and DefaultPipeWriter.FlushAsync/GetResult already acquire a lock. --- .../Infrastructure/TimingPipeFlusher.cs | 48 ++++--- .../Core/test/TimingPipeFlusherTests.cs | 67 ++++++++++ .../test/LibuvOutputConsumerTests.cs | 32 ++--- .../Http1LargeWritingBenchmark.cs | 118 ++++++++++++++++++ .../Http1WritingBenchmark.cs | 14 ++- 5 files changed, 241 insertions(+), 38 deletions(-) create mode 100644 src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs create mode 100644 src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs index e9c66b40b6..dfdc04f485 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs @@ -51,35 +51,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure public ValueTask FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) { - var flushValueTask = _writer.FlushAsync(cancellationToken); - - if (minRate != null) - { - _timeoutControl.BytesWrittenToBuffer(minRate, count); - } - - if (flushValueTask.IsCompletedSuccessfully) - { - return new ValueTask(flushValueTask.Result); - } - // https://github.com/dotnet/corefxlab/issues/1334 // Pipelines don't support multiple awaiters on flush. - // While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes, - // it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case, - // we find previous flush Task which still accounts for any newly committed bytes and await that. lock (_flushLock) { - if (_lastFlushTask == null || _lastFlushTask.IsCompleted) + if (_lastFlushTask != null && !_lastFlushTask.IsCompleted) { - _lastFlushTask = flushValueTask.AsTask(); + _lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken); + return new ValueTask(_lastFlushTask); } return TimeFlushAsync(minRate, count, outputAborter, cancellationToken); } } - private async ValueTask TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) + private ValueTask TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) + { + var pipeFlushTask = _writer.FlushAsync(cancellationToken); + + if (minRate != null) + { + _timeoutControl.BytesWrittenToBuffer(minRate, count); + } + + if (pipeFlushTask.IsCompletedSuccessfully) + { + return new ValueTask(pipeFlushTask.Result); + } + + _lastFlushTask = TimeFlushAsyncAwaited(pipeFlushTask, minRate, count, outputAborter, cancellationToken); + return new ValueTask(_lastFlushTask); + } + + private async Task TimeFlushAsyncAwaited(ValueTask pipeFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) { if (minRate != null) { @@ -88,7 +92,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure try { - return await _lastFlushTask; + return await pipeFlushTask; } catch (OperationCanceledException ex) when (outputAborter != null) { @@ -111,5 +115,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure return default; } + + private async Task AwaitLastFlushAndTimeFlushAsync(Task lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) + { + await lastFlushTask; + return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken); + } } } diff --git a/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs b/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs new file mode 100644 index 0000000000..400d390039 --- /dev/null +++ b/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs @@ -0,0 +1,67 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Moq; +using Xunit; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests +{ + public class TimingPipeFlusherTests + { + [Fact] + public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall() + { + var mockPipeWriter = new Mock(); + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + }; + var pipeWriterFlushCallCount = 0; + + mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() => + { + return new ValueTask(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task); + }); + + var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null); + + var flushTask0 = timingPipeFlusher.FlushAsync(); + var flushTask1 = timingPipeFlusher.FlushAsync(); + var flushTask2 = timingPipeFlusher.FlushAsync(); + + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + Assert.False(flushTask2.IsCompleted); + Assert.Equal(1, pipeWriterFlushCallCount); + + pipeWriterFlushTcsArray[0].SetResult(default); + await flushTask0.AsTask().DefaultTimeout(); + + Assert.True(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + Assert.False(flushTask2.IsCompleted); + Assert.True(pipeWriterFlushCallCount <= 2); + + pipeWriterFlushTcsArray[1].SetResult(default); + await flushTask1.AsTask().DefaultTimeout(); + + Assert.True(flushTask0.IsCompleted); + Assert.True(flushTask1.IsCompleted); + Assert.False(flushTask2.IsCompleted); + Assert.True(pipeWriterFlushCallCount <= 3); + + pipeWriterFlushTcsArray[2].SetResult(default); + await flushTask2.AsTask().DefaultTimeout(); + + Assert.True(flushTask0.IsCompleted); + Assert.True(flushTask1.IsCompleted); + Assert.True(flushTask2.IsCompleted); + Assert.Equal(3, pipeWriterFlushCallCount); + } + } +} diff --git a/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs b/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs index d72e6485b5..e3a4bf3898 100644 --- a/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs +++ b/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs @@ -553,15 +553,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task1Waits.IsFaulted); // following tasks should wait. - var task3Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token); + var task2Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; - // Third task is not completed - Assert.False(task3Canceled.IsCompleted); - Assert.False(task3Canceled.IsCanceled); - Assert.False(task3Canceled.IsFaulted); + // Second task is not completed + Assert.False(task2Canceled.IsCompleted); + Assert.False(task2Canceled.IsCanceled); + Assert.False(task2Canceled.IsFaulted); abortedSource.Cancel(); @@ -571,29 +571,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); } - // First task is completed + // First task is completed Assert.True(task1Waits.IsCompleted); Assert.False(task1Waits.IsCanceled); Assert.False(task1Waits.IsFaulted); - // A final write guarantees that the error is observed by OutputProducer, - // but doesn't return a canceled/faulted task. - var task4Success = outputProducer.WriteDataAsync(fullBuffer); - Assert.True(task4Success.IsCompleted); - Assert.False(task4Success.IsCanceled); - Assert.False(task4Success.IsFaulted); + // Second task is now canceled + await Assert.ThrowsAsync(() => task2Canceled); + Assert.True(task2Canceled.IsCanceled); - // Third task is now canceled - await Assert.ThrowsAsync(() => task3Canceled); - Assert.True(task3Canceled.IsCanceled); + // A final write can still succeed. + var task3Success = outputProducer.WriteDataAsync(fullBuffer); await _mockLibuv.OnPostTask; - // Complete the 4th write + // Complete the 3rd write while (completeQueue.TryDequeue(out var triggerNextCompleted)) { await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); } + + Assert.True(task3Success.IsCompleted); + Assert.False(task3Success.IsCanceled); + Assert.False(task3Success.IsFaulted);; } }); } diff --git a/src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs new file mode 100644 index 0000000000..518a7a4d81 --- /dev/null +++ b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1LargeWritingBenchmark.cs @@ -0,0 +1,118 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; +using Microsoft.AspNetCore.Testing; + +namespace Microsoft.AspNetCore.Server.Kestrel.Performance +{ + public class Http1LargeWritingBenchmark + { + private TestHttp1Connection _http1Connection; + private DuplexPipe.DuplexPipePair _pair; + private MemoryPool _memoryPool; + private Task _consumeResponseBodyTask; + + // Keep this divisable by 10 so it can be evenly segmented. + private readonly byte[] _writeData = new byte[10 * 1024 * 1024]; + + [GlobalSetup] + public void GlobalSetup() + { + _memoryPool = KestrelMemoryPool.Create(); + _http1Connection = MakeHttp1Connection(); + _consumeResponseBodyTask = ConsumeResponseBody(); + } + + [IterationSetup] + public void Setup() + { + _http1Connection.Reset(); + _http1Connection.RequestHeaders.ContentLength = _writeData.Length; + _http1Connection.FlushAsync().GetAwaiter().GetResult(); + } + + [Benchmark] + public Task WriteAsync() + { + return _http1Connection.ResponseBody.WriteAsync(_writeData, 0, _writeData.Length, default); + } + + [Benchmark] + public Task WriteSegmentsUnawaitedAsync() + { + // Write a 10th the of the data at a time + var segmentSize = _writeData.Length / 10; + + for (int i = 0; i < 9; i++) + { + // Ignore the first nine tasks. + _ = _http1Connection.ResponseBody.WriteAsync(_writeData, i * segmentSize, segmentSize, default); + } + + return _http1Connection.ResponseBody.WriteAsync(_writeData, 9 * segmentSize, segmentSize, default); + } + + private TestHttp1Connection MakeHttp1Connection() + { + var options = new PipeOptions(_memoryPool, useSynchronizationContext: false); + var pair = DuplexPipe.CreateConnectionPair(options, options); + _pair = pair; + + var serviceContext = new ServiceContext + { + DateHeaderValueManager = new DateHeaderValueManager(), + ServerOptions = new KestrelServerOptions(), + Log = new MockTrace(), + HttpParser = new HttpParser() + }; + + var http1Connection = new TestHttp1Connection(new HttpConnectionContext + { + ServiceContext = serviceContext, + ConnectionFeatures = new FeatureCollection(), + MemoryPool = _memoryPool, + TimeoutControl = new TimeoutControl(timeoutHandler: null), + Transport = pair.Transport + }); + + http1Connection.Reset(); + http1Connection.InitializeBodyControl(MessageBody.ZeroContentLengthKeepAlive); + serviceContext.DateHeaderValueManager.OnHeartbeat(DateTimeOffset.UtcNow); + + return http1Connection; + } + + private async Task ConsumeResponseBody() + { + var reader = _pair.Application.Input; + var readResult = await reader.ReadAsync(); + + while (!readResult.IsCompleted) + { + reader.AdvanceTo(readResult.Buffer.End); + readResult = await reader.ReadAsync(); + } + + reader.Complete(); + } + + [GlobalCleanup] + public void Dispose() + { + _pair.Transport.Output.Complete(); + _consumeResponseBodyTask.GetAwaiter().GetResult(); + _memoryPool?.Dispose(); + } + } +} diff --git a/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs index d0c1cf3370..ff6023fd18 100644 --- a/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs +++ b/src/Servers/Kestrel/perf/Kestrel.Performance/Http1WritingBenchmark.cs @@ -29,6 +29,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance private TestHttp1Connection _http1Connection; private DuplexPipe.DuplexPipePair _pair; private MemoryPool _memoryPool; + private Task _consumeResponseBodyTask; private readonly byte[] _writeData = Encoding.ASCII.GetBytes("Hello, World!"); @@ -37,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance { _memoryPool = KestrelMemoryPool.Create(); _http1Connection = MakeHttp1Connection(); + _consumeResponseBodyTask = ConsumeResponseBody(); } [Params(true, false)] @@ -125,14 +127,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance return http1Connection; } - [IterationCleanup] - public void Cleanup() + private async Task ConsumeResponseBody() { var reader = _pair.Application.Input; - if (reader.TryRead(out var readResult)) + var readResult = await reader.ReadAsync(); + + while (!readResult.IsCompleted) { reader.AdvanceTo(readResult.Buffer.End); + readResult = await reader.ReadAsync(); } + + reader.Complete(); } public enum Startup @@ -145,6 +151,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance [GlobalCleanup] public void Dispose() { + _pair.Transport.Output.Complete(); + _consumeResponseBodyTask.GetAwaiter().GetResult(); _memoryPool?.Dispose(); } }