Work around potential race in PipeWriter (#10165)

- Make sure we always await the last flush task before calling FlushAsync
  again instead of preemptively calling FlushAsync and checking to see
  if the ValueTask is incomplete before bothering to acquire the _flushLock
- This now acquires the _flushLock fore every call to Response.Body.Write
  whereas this only happened for truly async writes before.
- I don't think this is a big concern since this should normally be uncontested,
  and DefaultPipeWriter.FlushAsync/GetResult already acquire a lock.
This commit is contained in:
Stephen Halter 2019-05-15 11:30:48 -07:00 committed by GitHub
parent fb49ad6e21
commit bff1d0e6c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 241 additions and 38 deletions

View File

@ -51,35 +51,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
var flushValueTask = _writer.FlushAsync(cancellationToken);
if (minRate != null)
{
_timeoutControl.BytesWrittenToBuffer(minRate, count);
}
if (flushValueTask.IsCompletedSuccessfully)
{
return new ValueTask<FlushResult>(flushValueTask.Result);
}
// https://github.com/dotnet/corefxlab/issues/1334
// Pipelines don't support multiple awaiters on flush.
// While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes,
// it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case,
// we find previous flush Task which still accounts for any newly committed bytes and await that.
lock (_flushLock)
{
if (_lastFlushTask == null || _lastFlushTask.IsCompleted)
if (_lastFlushTask != null && !_lastFlushTask.IsCompleted)
{
_lastFlushTask = flushValueTask.AsTask();
_lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken);
return new ValueTask<FlushResult>(_lastFlushTask);
}
return TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
}
}
private async ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
private ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
var pipeFlushTask = _writer.FlushAsync(cancellationToken);
if (minRate != null)
{
_timeoutControl.BytesWrittenToBuffer(minRate, count);
}
if (pipeFlushTask.IsCompletedSuccessfully)
{
return new ValueTask<FlushResult>(pipeFlushTask.Result);
}
_lastFlushTask = TimeFlushAsyncAwaited(pipeFlushTask, minRate, count, outputAborter, cancellationToken);
return new ValueTask<FlushResult>(_lastFlushTask);
}
private async Task<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pipeFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
if (minRate != null)
{
@ -88,7 +92,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
try
{
return await _lastFlushTask;
return await pipeFlushTask;
}
catch (OperationCanceledException ex) when (outputAborter != null)
{
@ -111,5 +115,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
return default;
}
private async Task<FlushResult> AwaitLastFlushAndTimeFlushAsync(Task lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
await lastFlushTask;
return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
}
}
}

View File

@ -0,0 +1,67 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class TimingPipeFlusherTests
{
[Fact]
public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall()
{
var mockPipeWriter = new Mock<PipeWriter>();
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var pipeWriterFlushCallCount = 0;
mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() =>
{
return new ValueTask<FlushResult>(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task);
});
var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null);
var flushTask0 = timingPipeFlusher.FlushAsync();
var flushTask1 = timingPipeFlusher.FlushAsync();
var flushTask2 = timingPipeFlusher.FlushAsync();
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
Assert.False(flushTask2.IsCompleted);
Assert.Equal(1, pipeWriterFlushCallCount);
pipeWriterFlushTcsArray[0].SetResult(default);
await flushTask0.AsTask().DefaultTimeout();
Assert.True(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
Assert.False(flushTask2.IsCompleted);
Assert.True(pipeWriterFlushCallCount <= 2);
pipeWriterFlushTcsArray[1].SetResult(default);
await flushTask1.AsTask().DefaultTimeout();
Assert.True(flushTask0.IsCompleted);
Assert.True(flushTask1.IsCompleted);
Assert.False(flushTask2.IsCompleted);
Assert.True(pipeWriterFlushCallCount <= 3);
pipeWriterFlushTcsArray[2].SetResult(default);
await flushTask2.AsTask().DefaultTimeout();
Assert.True(flushTask0.IsCompleted);
Assert.True(flushTask1.IsCompleted);
Assert.True(flushTask2.IsCompleted);
Assert.Equal(3, pipeWriterFlushCallCount);
}
}
}

View File

@ -553,15 +553,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
Assert.False(task1Waits.IsFaulted);
// following tasks should wait.
var task3Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
var task2Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
// Give time for tasks to percolate
await _mockLibuv.OnPostTask;
// Third task is not completed
Assert.False(task3Canceled.IsCompleted);
Assert.False(task3Canceled.IsCanceled);
Assert.False(task3Canceled.IsFaulted);
// Second task is not completed
Assert.False(task2Canceled.IsCompleted);
Assert.False(task2Canceled.IsCanceled);
Assert.False(task2Canceled.IsFaulted);
abortedSource.Cancel();
@ -571,29 +571,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
// First task is completed
// First task is completed
Assert.True(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
Assert.False(task1Waits.IsFaulted);
// A final write guarantees that the error is observed by OutputProducer,
// but doesn't return a canceled/faulted task.
var task4Success = outputProducer.WriteDataAsync(fullBuffer);
Assert.True(task4Success.IsCompleted);
Assert.False(task4Success.IsCanceled);
Assert.False(task4Success.IsFaulted);
// Second task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task2Canceled);
Assert.True(task2Canceled.IsCanceled);
// Third task is now canceled
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
Assert.True(task3Canceled.IsCanceled);
// A final write can still succeed.
var task3Success = outputProducer.WriteDataAsync(fullBuffer);
await _mockLibuv.OnPostTask;
// Complete the 4th write
// Complete the 3rd write
while (completeQueue.TryDequeue(out var triggerNextCompleted))
{
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
Assert.True(task3Success.IsCompleted);
Assert.False(task3Success.IsCanceled);
Assert.False(task3Success.IsFaulted);;
}
});
}

View File

@ -0,0 +1,118 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.AspNetCore.Testing;
namespace Microsoft.AspNetCore.Server.Kestrel.Performance
{
public class Http1LargeWritingBenchmark
{
private TestHttp1Connection _http1Connection;
private DuplexPipe.DuplexPipePair _pair;
private MemoryPool<byte> _memoryPool;
private Task _consumeResponseBodyTask;
// Keep this divisable by 10 so it can be evenly segmented.
private readonly byte[] _writeData = new byte[10 * 1024 * 1024];
[GlobalSetup]
public void GlobalSetup()
{
_memoryPool = KestrelMemoryPool.Create();
_http1Connection = MakeHttp1Connection();
_consumeResponseBodyTask = ConsumeResponseBody();
}
[IterationSetup]
public void Setup()
{
_http1Connection.Reset();
_http1Connection.RequestHeaders.ContentLength = _writeData.Length;
_http1Connection.FlushAsync().GetAwaiter().GetResult();
}
[Benchmark]
public Task WriteAsync()
{
return _http1Connection.ResponseBody.WriteAsync(_writeData, 0, _writeData.Length, default);
}
[Benchmark]
public Task WriteSegmentsUnawaitedAsync()
{
// Write a 10th the of the data at a time
var segmentSize = _writeData.Length / 10;
for (int i = 0; i < 9; i++)
{
// Ignore the first nine tasks.
_ = _http1Connection.ResponseBody.WriteAsync(_writeData, i * segmentSize, segmentSize, default);
}
return _http1Connection.ResponseBody.WriteAsync(_writeData, 9 * segmentSize, segmentSize, default);
}
private TestHttp1Connection MakeHttp1Connection()
{
var options = new PipeOptions(_memoryPool, useSynchronizationContext: false);
var pair = DuplexPipe.CreateConnectionPair(options, options);
_pair = pair;
var serviceContext = new ServiceContext
{
DateHeaderValueManager = new DateHeaderValueManager(),
ServerOptions = new KestrelServerOptions(),
Log = new MockTrace(),
HttpParser = new HttpParser<Http1ParsingHandler>()
};
var http1Connection = new TestHttp1Connection(new HttpConnectionContext
{
ServiceContext = serviceContext,
ConnectionFeatures = new FeatureCollection(),
MemoryPool = _memoryPool,
TimeoutControl = new TimeoutControl(timeoutHandler: null),
Transport = pair.Transport
});
http1Connection.Reset();
http1Connection.InitializeBodyControl(MessageBody.ZeroContentLengthKeepAlive);
serviceContext.DateHeaderValueManager.OnHeartbeat(DateTimeOffset.UtcNow);
return http1Connection;
}
private async Task ConsumeResponseBody()
{
var reader = _pair.Application.Input;
var readResult = await reader.ReadAsync();
while (!readResult.IsCompleted)
{
reader.AdvanceTo(readResult.Buffer.End);
readResult = await reader.ReadAsync();
}
reader.Complete();
}
[GlobalCleanup]
public void Dispose()
{
_pair.Transport.Output.Complete();
_consumeResponseBodyTask.GetAwaiter().GetResult();
_memoryPool?.Dispose();
}
}
}

View File

@ -29,6 +29,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
private TestHttp1Connection _http1Connection;
private DuplexPipe.DuplexPipePair _pair;
private MemoryPool<byte> _memoryPool;
private Task _consumeResponseBodyTask;
private readonly byte[] _writeData = Encoding.ASCII.GetBytes("Hello, World!");
@ -37,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
{
_memoryPool = KestrelMemoryPool.Create();
_http1Connection = MakeHttp1Connection();
_consumeResponseBodyTask = ConsumeResponseBody();
}
[Params(true, false)]
@ -125,14 +127,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
return http1Connection;
}
[IterationCleanup]
public void Cleanup()
private async Task ConsumeResponseBody()
{
var reader = _pair.Application.Input;
if (reader.TryRead(out var readResult))
var readResult = await reader.ReadAsync();
while (!readResult.IsCompleted)
{
reader.AdvanceTo(readResult.Buffer.End);
readResult = await reader.ReadAsync();
}
reader.Complete();
}
public enum Startup
@ -145,6 +151,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
[GlobalCleanup]
public void Dispose()
{
_pair.Transport.Output.Complete();
_consumeResponseBodyTask.GetAwaiter().GetResult();
_memoryPool?.Dispose();
}
}