Revert Work around potential race in PipeWriter (#10315)
* Remove Flaky attributes
* Revert "Work around potential race in PipeWriter (#10165)"
This reverts commit bff1d0e6c4.
This commit is contained in:
parent
76b73668a8
commit
a05d555f11
|
|
@ -17,7 +17,6 @@ namespace Templates.Test.SpaTemplateTest
|
|||
: base(projectFactory, browserFixture, output) { }
|
||||
|
||||
[Fact]
|
||||
[Flaky("https://github.com/aspnet/AspNetCore-Internal/issues/2422", FlakyOn.All)]
|
||||
public Task AngularTemplate_Works()
|
||||
=> SpaTemplateImplAsync("angularnoauth", "angular", useLocalDb: false, usesAuth: false);
|
||||
|
||||
|
|
|
|||
|
|
@ -51,39 +51,35 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
|
||||
public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
|
||||
{
|
||||
// https://github.com/dotnet/corefxlab/issues/1334
|
||||
// Pipelines don't support multiple awaiters on flush.
|
||||
lock (_flushLock)
|
||||
{
|
||||
if (_lastFlushTask != null && !_lastFlushTask.IsCompleted)
|
||||
{
|
||||
_lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken);
|
||||
return new ValueTask<FlushResult>(_lastFlushTask);
|
||||
}
|
||||
|
||||
return TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
|
||||
{
|
||||
var pipeFlushTask = _writer.FlushAsync(cancellationToken);
|
||||
var flushValueTask = _writer.FlushAsync(cancellationToken);
|
||||
|
||||
if (minRate != null)
|
||||
{
|
||||
_timeoutControl.BytesWrittenToBuffer(minRate, count);
|
||||
}
|
||||
|
||||
if (pipeFlushTask.IsCompletedSuccessfully)
|
||||
if (flushValueTask.IsCompletedSuccessfully)
|
||||
{
|
||||
return new ValueTask<FlushResult>(pipeFlushTask.Result);
|
||||
return new ValueTask<FlushResult>(flushValueTask.Result);
|
||||
}
|
||||
|
||||
_lastFlushTask = TimeFlushAsyncAwaited(pipeFlushTask, minRate, count, outputAborter, cancellationToken);
|
||||
return new ValueTask<FlushResult>(_lastFlushTask);
|
||||
// https://github.com/dotnet/corefxlab/issues/1334
|
||||
// Pipelines don't support multiple awaiters on flush.
|
||||
// While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes,
|
||||
// it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case,
|
||||
// we find previous flush Task which still accounts for any newly committed bytes and await that.
|
||||
lock (_flushLock)
|
||||
{
|
||||
if (_lastFlushTask == null || _lastFlushTask.IsCompleted)
|
||||
{
|
||||
_lastFlushTask = flushValueTask.AsTask();
|
||||
}
|
||||
|
||||
return TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pipeFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
|
||||
private async ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
|
||||
{
|
||||
if (minRate != null)
|
||||
{
|
||||
|
|
@ -92,7 +88,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
|
||||
try
|
||||
{
|
||||
return await pipeFlushTask;
|
||||
return await _lastFlushTask;
|
||||
}
|
||||
catch (OperationCanceledException ex) when (outputAborter != null)
|
||||
{
|
||||
|
|
@ -115,11 +111,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
|
|||
|
||||
return default;
|
||||
}
|
||||
|
||||
private async Task<FlushResult> AwaitLastFlushAndTimeFlushAsync(Task lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
|
||||
{
|
||||
await lastFlushTask;
|
||||
return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
||||
{
|
||||
public class TimingPipeFlusherTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall()
|
||||
{
|
||||
var mockPipeWriter = new Mock<PipeWriter>();
|
||||
var pipeWriterFlushTcsArray = new[] {
|
||||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
|
||||
};
|
||||
var pipeWriterFlushCallCount = 0;
|
||||
|
||||
mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() =>
|
||||
{
|
||||
return new ValueTask<FlushResult>(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task);
|
||||
});
|
||||
|
||||
var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null);
|
||||
|
||||
var flushTask0 = timingPipeFlusher.FlushAsync();
|
||||
var flushTask1 = timingPipeFlusher.FlushAsync();
|
||||
var flushTask2 = timingPipeFlusher.FlushAsync();
|
||||
|
||||
Assert.False(flushTask0.IsCompleted);
|
||||
Assert.False(flushTask1.IsCompleted);
|
||||
Assert.False(flushTask2.IsCompleted);
|
||||
Assert.Equal(1, pipeWriterFlushCallCount);
|
||||
|
||||
pipeWriterFlushTcsArray[0].SetResult(default);
|
||||
await flushTask0.AsTask().DefaultTimeout();
|
||||
|
||||
Assert.True(flushTask0.IsCompleted);
|
||||
Assert.False(flushTask1.IsCompleted);
|
||||
Assert.False(flushTask2.IsCompleted);
|
||||
Assert.True(pipeWriterFlushCallCount <= 2);
|
||||
|
||||
pipeWriterFlushTcsArray[1].SetResult(default);
|
||||
await flushTask1.AsTask().DefaultTimeout();
|
||||
|
||||
Assert.True(flushTask0.IsCompleted);
|
||||
Assert.True(flushTask1.IsCompleted);
|
||||
Assert.False(flushTask2.IsCompleted);
|
||||
Assert.True(pipeWriterFlushCallCount <= 3);
|
||||
|
||||
pipeWriterFlushTcsArray[2].SetResult(default);
|
||||
await flushTask2.AsTask().DefaultTimeout();
|
||||
|
||||
Assert.True(flushTask0.IsCompleted);
|
||||
Assert.True(flushTask1.IsCompleted);
|
||||
Assert.True(flushTask2.IsCompleted);
|
||||
Assert.Equal(3, pipeWriterFlushCallCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -553,15 +553,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
Assert.False(task1Waits.IsFaulted);
|
||||
|
||||
// following tasks should wait.
|
||||
var task2Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
|
||||
var task3Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
|
||||
|
||||
// Give time for tasks to percolate
|
||||
await _mockLibuv.OnPostTask;
|
||||
|
||||
// Second task is not completed
|
||||
Assert.False(task2Canceled.IsCompleted);
|
||||
Assert.False(task2Canceled.IsCanceled);
|
||||
Assert.False(task2Canceled.IsFaulted);
|
||||
// Third task is not completed
|
||||
Assert.False(task3Canceled.IsCompleted);
|
||||
Assert.False(task3Canceled.IsCanceled);
|
||||
Assert.False(task3Canceled.IsFaulted);
|
||||
|
||||
abortedSource.Cancel();
|
||||
|
||||
|
|
@ -571,29 +571,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
|
||||
}
|
||||
|
||||
// First task is completed
|
||||
// First task is completed
|
||||
Assert.True(task1Waits.IsCompleted);
|
||||
Assert.False(task1Waits.IsCanceled);
|
||||
Assert.False(task1Waits.IsFaulted);
|
||||
|
||||
// Second task is now canceled
|
||||
await Assert.ThrowsAsync<OperationCanceledException>(() => task2Canceled);
|
||||
Assert.True(task2Canceled.IsCanceled);
|
||||
// A final write guarantees that the error is observed by OutputProducer,
|
||||
// but doesn't return a canceled/faulted task.
|
||||
var task4Success = outputProducer.WriteDataAsync(fullBuffer);
|
||||
Assert.True(task4Success.IsCompleted);
|
||||
Assert.False(task4Success.IsCanceled);
|
||||
Assert.False(task4Success.IsFaulted);
|
||||
|
||||
// A final write can still succeed.
|
||||
var task3Success = outputProducer.WriteDataAsync(fullBuffer);
|
||||
// Third task is now canceled
|
||||
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
|
||||
Assert.True(task3Canceled.IsCanceled);
|
||||
|
||||
await _mockLibuv.OnPostTask;
|
||||
|
||||
// Complete the 3rd write
|
||||
// Complete the 4th write
|
||||
while (completeQueue.TryDequeue(out var triggerNextCompleted))
|
||||
{
|
||||
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
|
||||
}
|
||||
|
||||
Assert.True(task3Success.IsCompleted);
|
||||
Assert.False(task3Success.IsCanceled);
|
||||
Assert.False(task3Success.IsFaulted);;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,118 +0,0 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading.Tasks;
|
||||
using BenchmarkDotNet.Attributes;
|
||||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
|
||||
using Microsoft.AspNetCore.Testing;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
||||
{
|
||||
public class Http1LargeWritingBenchmark
|
||||
{
|
||||
private TestHttp1Connection _http1Connection;
|
||||
private DuplexPipe.DuplexPipePair _pair;
|
||||
private MemoryPool<byte> _memoryPool;
|
||||
private Task _consumeResponseBodyTask;
|
||||
|
||||
// Keep this divisable by 10 so it can be evenly segmented.
|
||||
private readonly byte[] _writeData = new byte[10 * 1024 * 1024];
|
||||
|
||||
[GlobalSetup]
|
||||
public void GlobalSetup()
|
||||
{
|
||||
_memoryPool = KestrelMemoryPool.Create();
|
||||
_http1Connection = MakeHttp1Connection();
|
||||
_consumeResponseBodyTask = ConsumeResponseBody();
|
||||
}
|
||||
|
||||
[IterationSetup]
|
||||
public void Setup()
|
||||
{
|
||||
_http1Connection.Reset();
|
||||
_http1Connection.RequestHeaders.ContentLength = _writeData.Length;
|
||||
_http1Connection.FlushAsync().GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
[Benchmark]
|
||||
public Task WriteAsync()
|
||||
{
|
||||
return _http1Connection.ResponseBody.WriteAsync(_writeData, 0, _writeData.Length, default);
|
||||
}
|
||||
|
||||
[Benchmark]
|
||||
public Task WriteSegmentsUnawaitedAsync()
|
||||
{
|
||||
// Write a 10th the of the data at a time
|
||||
var segmentSize = _writeData.Length / 10;
|
||||
|
||||
for (int i = 0; i < 9; i++)
|
||||
{
|
||||
// Ignore the first nine tasks.
|
||||
_ = _http1Connection.ResponseBody.WriteAsync(_writeData, i * segmentSize, segmentSize, default);
|
||||
}
|
||||
|
||||
return _http1Connection.ResponseBody.WriteAsync(_writeData, 9 * segmentSize, segmentSize, default);
|
||||
}
|
||||
|
||||
private TestHttp1Connection MakeHttp1Connection()
|
||||
{
|
||||
var options = new PipeOptions(_memoryPool, useSynchronizationContext: false);
|
||||
var pair = DuplexPipe.CreateConnectionPair(options, options);
|
||||
_pair = pair;
|
||||
|
||||
var serviceContext = new ServiceContext
|
||||
{
|
||||
DateHeaderValueManager = new DateHeaderValueManager(),
|
||||
ServerOptions = new KestrelServerOptions(),
|
||||
Log = new MockTrace(),
|
||||
HttpParser = new HttpParser<Http1ParsingHandler>()
|
||||
};
|
||||
|
||||
var http1Connection = new TestHttp1Connection(new HttpConnectionContext
|
||||
{
|
||||
ServiceContext = serviceContext,
|
||||
ConnectionFeatures = new FeatureCollection(),
|
||||
MemoryPool = _memoryPool,
|
||||
TimeoutControl = new TimeoutControl(timeoutHandler: null),
|
||||
Transport = pair.Transport
|
||||
});
|
||||
|
||||
http1Connection.Reset();
|
||||
http1Connection.InitializeBodyControl(MessageBody.ZeroContentLengthKeepAlive);
|
||||
serviceContext.DateHeaderValueManager.OnHeartbeat(DateTimeOffset.UtcNow);
|
||||
|
||||
return http1Connection;
|
||||
}
|
||||
|
||||
private async Task ConsumeResponseBody()
|
||||
{
|
||||
var reader = _pair.Application.Input;
|
||||
var readResult = await reader.ReadAsync();
|
||||
|
||||
while (!readResult.IsCompleted)
|
||||
{
|
||||
reader.AdvanceTo(readResult.Buffer.End);
|
||||
readResult = await reader.ReadAsync();
|
||||
}
|
||||
|
||||
reader.Complete();
|
||||
}
|
||||
|
||||
[GlobalCleanup]
|
||||
public void Dispose()
|
||||
{
|
||||
_pair.Transport.Output.Complete();
|
||||
_consumeResponseBodyTask.GetAwaiter().GetResult();
|
||||
_memoryPool?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -29,7 +29,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
private TestHttp1Connection _http1Connection;
|
||||
private DuplexPipe.DuplexPipePair _pair;
|
||||
private MemoryPool<byte> _memoryPool;
|
||||
private Task _consumeResponseBodyTask;
|
||||
|
||||
private readonly byte[] _writeData = Encoding.ASCII.GetBytes("Hello, World!");
|
||||
|
||||
|
|
@ -38,7 +37,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
{
|
||||
_memoryPool = KestrelMemoryPool.Create();
|
||||
_http1Connection = MakeHttp1Connection();
|
||||
_consumeResponseBodyTask = ConsumeResponseBody();
|
||||
}
|
||||
|
||||
[Params(true, false)]
|
||||
|
|
@ -127,18 +125,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
return http1Connection;
|
||||
}
|
||||
|
||||
private async Task ConsumeResponseBody()
|
||||
[IterationCleanup]
|
||||
public void Cleanup()
|
||||
{
|
||||
var reader = _pair.Application.Input;
|
||||
var readResult = await reader.ReadAsync();
|
||||
|
||||
while (!readResult.IsCompleted)
|
||||
if (reader.TryRead(out var readResult))
|
||||
{
|
||||
reader.AdvanceTo(readResult.Buffer.End);
|
||||
readResult = await reader.ReadAsync();
|
||||
}
|
||||
|
||||
reader.Complete();
|
||||
}
|
||||
|
||||
public enum Startup
|
||||
|
|
@ -151,8 +145,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
[GlobalCleanup]
|
||||
public void Dispose()
|
||||
{
|
||||
_pair.Transport.Output.Complete();
|
||||
_consumeResponseBodyTask.GetAwaiter().GetResult();
|
||||
_memoryPool?.Dispose();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue