Wrap the PipeWriter used by Kestrel's output writing logic (#12081)

This commit is contained in:
Stephen Halter 2019-07-16 23:06:24 -07:00 committed by GitHub
parent fb12909417
commit 96f55fcf25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1077 additions and 248 deletions

View File

@ -12,6 +12,7 @@ using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
@ -45,7 +46,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
private long _unflushedBytes;
private int _currentMemoryPrefixBytes;
private readonly PipeWriter _pipeWriter;
private readonly ConcurrentPipeWriter _pipeWriter;
private IMemoryOwner<byte> _fakeMemoryOwner;
// Chunked responses need to be treated uniquely when using GetMemory + Advance.
@ -81,18 +82,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
IHttpMinResponseDataRateFeature minResponseDataRateFeature,
MemoryPool<byte> memoryPool)
{
_pipeWriter = pipeWriter;
// Allow appending more data to the PipeWriter when a flush is pending.
_pipeWriter = new ConcurrentPipeWriter(pipeWriter, memoryPool);
_connectionId = connectionId;
_connectionContext = connectionContext;
_log = log;
_minResponseDataRateFeature = minResponseDataRateFeature;
_flusher = new TimingPipeFlusher(pipeWriter, timeoutControl, log);
_flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl, log);
_memoryPool = memoryPool;
}
// For tests
internal PipeWriter PipeWriter => _pipeWriter;
public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
@ -408,6 +407,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
lock (_contextLock)
{
_pipeWriter.Abort();
if (_fakeMemoryOwner != null)
{
_fakeMemoryOwner.Dispose();

View File

@ -87,6 +87,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
context.TimeoutControl,
httpLimits.MinResponseDataRate,
context.ConnectionId,
context.MemoryPool,
context.ServiceContext.Log);
_hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize);

View File

@ -15,6 +15,7 @@ using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
@ -26,7 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly object _writeLock = new object();
private readonly Http2Frame _outgoingFrame;
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
private readonly PipeWriter _outputWriter;
private readonly ConcurrentPipeWriter _outputWriter;
private readonly ConnectionContext _connectionContext;
private readonly Http2Connection _http2Connection;
private readonly OutputFlowControl _connectionOutputFlowControl;
@ -51,9 +52,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
ITimeoutControl timeoutControl,
MinDataRate minResponseDataRate,
string connectionId,
MemoryPool<byte> memoryPool,
IKestrelTrace log)
{
_outputWriter = outputPipeWriter;
// Allow appending more data to the PipeWriter when a flush is pending.
_outputWriter = new ConcurrentPipeWriter(outputPipeWriter, memoryPool);
_connectionContext = connectionContext;
_http2Connection = http2Connection;
_connectionOutputFlowControl = connectionOutputFlowControl;
@ -89,6 +92,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_completed = true;
_connectionOutputFlowControl.Abort();
_outputWriter.Abort();
}
}

View File

@ -12,6 +12,7 @@ using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
@ -29,7 +30,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly MemoryPool<byte> _memoryPool;
private readonly Http2Stream _stream;
private readonly object _dataWriterLock = new object();
private readonly Pipe _dataPipe;
private readonly PipeWriter _pipeWriter;
private readonly PipeReader _pipeReader;
private readonly ValueTask<FlushResult> _dataWriteProcessingTask;
private bool _startedWritingDataFrames;
private bool _completed;
@ -43,7 +45,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
int streamId,
Http2FrameWriter frameWriter,
StreamOutputFlowControl flowControl,
ITimeoutControl timeoutControl,
MemoryPool<byte> pool,
Http2Stream stream,
IKestrelTrace log)
@ -55,8 +56,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_stream = stream;
_log = log;
_dataPipe = CreateDataPipe(pool);
_flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl, log);
var pipe = CreateDataPipe(pool);
_pipeWriter = new ConcurrentPipeWriter(pipe.Writer, pool);
_pipeReader = pipe.Reader;
// No need to pass in timeoutControl here, since no minDataRates are passed to the TimingPipeFlusher.
// The minimum output data rate is enforced at the connection level by Http2FrameWriter.
_flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, log);
_dataWriteProcessingTask = ProcessDataWrites();
}
@ -193,7 +200,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_startedWritingDataFrames = true;
_dataPipe.Writer.Write(data);
_pipeWriter.Write(data);
return _flusher.FlushAsync(this, cancellationToken).GetAsTask();
}
}
@ -210,7 +217,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_completed = true;
_suffixSent = true;
_dataPipe.Writer.Complete();
_pipeWriter.Complete();
return _dataWriteProcessingTask;
}
}
@ -239,7 +246,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_startedWritingDataFrames = true;
_dataPipe.Writer.Advance(bytes);
_pipeWriter.Advance(bytes);
}
}
@ -254,7 +261,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
return GetFakeMemory(sizeHint).Span;
}
return _dataPipe.Writer.GetSpan(sizeHint);
return _pipeWriter.GetSpan(sizeHint);
}
}
@ -269,7 +276,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
return GetFakeMemory(sizeHint);
}
return _dataPipe.Writer.GetMemory(sizeHint);
return _pipeWriter.GetMemory(sizeHint);
}
}
@ -282,7 +289,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
return;
}
_dataPipe.Writer.CancelPendingFlush();
_pipeWriter.CancelPendingFlush();
}
}
@ -306,7 +313,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_startedWritingDataFrames = true;
_dataPipe.Writer.Write(data);
_pipeWriter.Write(data);
return _flusher.FlushAsync(this, cancellationToken);
}
}
@ -345,7 +352,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
// Complete with an exception to prevent an end of stream data frame from being sent without an
// explicit call to WriteStreamSuffixAsync. ConnectionAbortedExceptions are swallowed, so the
// message doesn't matter
_dataPipe.Writer.Complete(new OperationCanceledException());
_pipeWriter.Complete(new OperationCanceledException());
_frameWriter.AbortPendingStreamDataWrites(_flowControl);
}
@ -364,7 +371,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
do
{
readResult = await _dataPipe.Reader.ReadAsync();
readResult = await _pipeReader.ReadAsync();
if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0)
{
@ -393,7 +400,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
flushResult = await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted);
}
_dataPipe.Reader.AdvanceTo(readResult.Buffer.End);
_pipeReader.AdvanceTo(readResult.Buffer.End);
} while (!readResult.IsCompleted);
}
catch (OperationCanceledException)
@ -405,7 +412,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception.");
}
_dataPipe.Reader.Complete();
_pipeReader.Complete();
return flushResult;

View File

@ -51,7 +51,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
context.StreamId,
context.FrameWriter,
_outputFlowControl,
context.TimeoutControl,
context.MemoryPool,
this,
context.ServiceContext.Log);

View File

@ -0,0 +1,130 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
namespace System.IO.Pipelines
{
// Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
{
private object _memoryOwner;
private BufferSegment _next;
private int _end;
/// <summary>
/// The End represents the offset into AvailableMemory where the range of "active" bytes ends. At the point when the block is leased
/// the End is guaranteed to be equal to Start. The value of Start may be assigned anywhere between 0 and
/// Buffer.Length, and must be equal to or less than End.
/// </summary>
public int End
{
get => _end;
set
{
Debug.Assert(value <= AvailableMemory.Length);
_end = value;
Memory = AvailableMemory.Slice(0, value);
}
}
/// <summary>
/// Reference to the next block of data when the overall "active" bytes spans multiple blocks. At the point when the block is
/// leased Next is guaranteed to be null. Start, End, and Next are used together in order to create a linked-list of discontiguous
/// working memory. The "active" memory is grown when bytes are copied in, End is increased, and Next is assigned. The "active"
/// memory is shrunk when bytes are consumed, Start is increased, and blocks are returned to the pool.
/// </summary>
public BufferSegment NextSegment
{
get => _next;
set
{
Next = value;
_next = value;
}
}
public void SetOwnedMemory(IMemoryOwner<byte> memoryOwner)
{
_memoryOwner = memoryOwner;
AvailableMemory = memoryOwner.Memory;
}
public void SetOwnedMemory(byte[] arrayPoolBuffer)
{
_memoryOwner = arrayPoolBuffer;
AvailableMemory = arrayPoolBuffer;
}
public void SetUnownedMemory(Memory<byte> memory)
{
AvailableMemory = memory;
}
public void ResetMemory()
{
if (_memoryOwner is IMemoryOwner<byte> owner)
{
owner.Dispose();
}
else if (_memoryOwner is byte[] array)
{
ArrayPool<byte>.Shared.Return(array);
}
// Order of below field clears is significant as it clears in a sequential order
// https://github.com/dotnet/corefx/pull/35256#issuecomment-462800477
Next = null;
RunningIndex = 0;
Memory = default;
_memoryOwner = null;
_next = null;
_end = 0;
AvailableMemory = default;
}
// Exposed for testing
internal object MemoryOwner => _memoryOwner;
public Memory<byte> AvailableMemory { get; private set; }
public int Length => End;
public int WritableBytes
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => AvailableMemory.Length - End;
}
public void SetNext(BufferSegment segment)
{
Debug.Assert(segment != null);
Debug.Assert(Next == null);
NextSegment = segment;
segment = this;
while (segment.Next != null)
{
segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length;
segment = segment.NextSegment;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static long GetLength(BufferSegment startSegment, int startIndex, BufferSegment endSegment, int endIndex)
{
return (endSegment.RunningIndex + (uint)endIndex) - (startSegment.RunningIndex + (uint)startIndex);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static long GetLength(long startPosition, BufferSegment endSegment, int endIndex)
{
return (endSegment.RunningIndex + (uint)endIndex) - startPosition;
}
}
}

View File

@ -0,0 +1,85 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Runtime.CompilerServices;
namespace System.IO.Pipelines
{
// Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
internal struct BufferSegmentStack
{
private SegmentAsValueType[] _array;
private int _size;
public BufferSegmentStack(int size)
{
_array = new SegmentAsValueType[size];
_size = 0;
}
public int Count => _size;
public bool TryPop(out BufferSegment result)
{
int size = _size - 1;
SegmentAsValueType[] array = _array;
if ((uint)size >= (uint)array.Length)
{
result = default;
return false;
}
_size = size;
result = array[size];
array[size] = default;
return true;
}
// Pushes an item to the top of the stack.
public void Push(BufferSegment item)
{
int size = _size;
SegmentAsValueType[] array = _array;
if ((uint)size < (uint)array.Length)
{
array[size] = item;
_size = size + 1;
}
else
{
PushWithResize(item);
}
}
// Non-inline from Stack.Push to improve its code quality as uncommon path
[MethodImpl(MethodImplOptions.NoInlining)]
private void PushWithResize(BufferSegment item)
{
Array.Resize(ref _array, 2 * _array.Length);
_array[_size] = item;
_size++;
}
/// <summary>
/// A simple struct we wrap reference types inside when storing in arrays to
/// bypass the CLR's covariant checks when writing to arrays.
/// </summary>
/// <remarks>
/// We use <see cref="SegmentAsValueType"/> as a wrapper to avoid paying the cost of covariant checks whenever
/// the underlying array that the <see cref="BufferSegmentStack"/> class uses is written to.
/// We've recognized this as a perf win in ETL traces for these stack frames:
/// clr!JIT_Stelem_Ref
/// clr!ArrayStoreCheck
/// clr!ObjIsInstanceOf
/// </remarks>
private readonly struct SegmentAsValueType
{
private readonly BufferSegment _value;
private SegmentAsValueType(BufferSegment value) => _value = value;
public static implicit operator SegmentAsValueType(BufferSegment s) => new SegmentAsValueType(s);
public static implicit operator BufferSegment(SegmentAsValueType s) => s._value;
}
}
}

View File

@ -0,0 +1,409 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers
{
/// <summary>
/// Wraps a PipeWriter so you can start appending more data to the pipe prior to the previous flush completing.
/// </summary>
internal sealed class ConcurrentPipeWriter : PipeWriter
{
// The following constants were copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
// and the associated StreamPipeWriterOptions defaults.
private const int InitialSegmentPoolSize = 4; // 16K
private const int MaxSegmentPoolSize = 256; // 1MB
private const int MinimumBufferSize = 4096; // 4K
private static readonly Exception _successfullyCompletedSentinel = new Exception();
private readonly object _sync = new object();
private readonly PipeWriter _innerPipeWriter;
private readonly MemoryPool<byte> _pool;
private readonly BufferSegmentStack _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
private BufferSegment _head;
private BufferSegment _tail;
private Memory<byte> _tailMemory;
private int _tailBytesBuffered;
private long _bytesBuffered;
// When _currentFlushTcs is null and _head/_tail is also null, the ConcurrentPipeWriter is in passthrough mode.
// When the ConcurrentPipeWriter is not in passthrough mode, that could be for one of two reasons:
//
// 1. A flush of the _innerPipeWriter is in progress.
// 2. Or the last flush of the _innerPipeWriter completed between external calls to GetMemory/Span() and Advance().
//
// In either case, we need to manually append buffer segments until the loop in the current or next call to FlushAsync()
// flushes all the buffers putting the ConcurrentPipeWriter back into passthrough mode.
// The manual buffer appending logic is borrowed from corefx's StreamPipeWriter.
private TaskCompletionSource<FlushResult> _currentFlushTcs;
private bool _bufferedWritePending;
// We're trusting the Http2FrameWriter and Http1OutputProducer to not call into the PipeWriter after calling Abort() or Complete()
// If an abort occurs while a flush is in progress, we clean up after the flush completes, and don't flush again.
private bool _aborted;
private Exception _completeException;
public ConcurrentPipeWriter(PipeWriter innerPipeWriter, MemoryPool<byte> pool)
{
_innerPipeWriter = innerPipeWriter;
_pool = pool;
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
lock (_sync)
{
if (_currentFlushTcs == null && _head == null)
{
return _innerPipeWriter.GetMemory(sizeHint);
}
AllocateMemoryUnsynchronized(sizeHint);
return _tailMemory;
}
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
lock (_sync)
{
if (_currentFlushTcs == null && _head == null)
{
return _innerPipeWriter.GetSpan(sizeHint);
}
AllocateMemoryUnsynchronized(sizeHint);
return _tailMemory.Span;
}
}
public override void Advance(int bytes)
{
lock (_sync)
{
if (_currentFlushTcs == null && _head == null)
{
_innerPipeWriter.Advance(bytes);
return;
}
if ((uint)bytes > (uint)_tailMemory.Length)
{
ThrowArgumentOutOfRangeException(nameof(bytes));
}
_tailBytesBuffered += bytes;
_bytesBuffered += bytes;
_tailMemory = _tailMemory.Slice(bytes);
_bufferedWritePending = false;
}
}
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
lock (_sync)
{
if (_currentFlushTcs != null)
{
return new ValueTask<FlushResult>(_currentFlushTcs.Task);
}
if (_bytesBuffered > 0)
{
CopyAndReturnSegmentsUnsynchronized();
}
var flushTask = _innerPipeWriter.FlushAsync(cancellationToken);
if (flushTask.IsCompletedSuccessfully)
{
if (_currentFlushTcs != null)
{
CompleteFlushUnsynchronized(flushTask.GetAwaiter().GetResult(), null);
}
return flushTask;
}
// Use a TCS instead of something resettable so it can be awaited by multiple awaiters.
_currentFlushTcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
var result = new ValueTask<FlushResult>(_currentFlushTcs.Task);
// FlushAsyncAwaited clears the TCS prior to completing. Make sure to construct the ValueTask
// from the TCS before calling FlushAsyncAwaited in case FlushAsyncAwaited completes inline.
_ = FlushAsyncAwaited(flushTask, cancellationToken);
return result;
}
}
private async Task FlushAsyncAwaited(ValueTask<FlushResult> flushTask, CancellationToken cancellationToken)
{
try
{
// This while (true) does look scary, but the real continuation condition is at the start of the loop
// after the await, so the _sync lock can be acquired.
while (true)
{
var flushResult = await flushTask;
lock (_sync)
{
if (_bytesBuffered == 0 || _aborted)
{
CompleteFlushUnsynchronized(flushResult, null);
return;
}
if (flushResult.IsCanceled)
{
// Complete anyone currently awaiting a flush since CancelPendingFlush() was called
_currentFlushTcs.SetResult(flushResult);
// Reset _currentFlushTcs, so we don't enter passthrough mode while we're still flushing.
_currentFlushTcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
}
CopyAndReturnSegmentsUnsynchronized();
flushTask = _innerPipeWriter.FlushAsync(cancellationToken);
}
}
}
catch (Exception ex)
{
lock (_sync)
{
CompleteFlushUnsynchronized(default, ex);
}
}
}
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
_innerPipeWriter.OnReaderCompleted(callback, state);
}
public override void CancelPendingFlush()
{
// We propagate IsCanceled when we do multiple flushes in a loop. If FlushResult.IsCanceled is true with more data pending to flush,
// _currentFlushTcs with canceled flush task, but rekick the FlushAsync loop.
_innerPipeWriter.CancelPendingFlush();
}
public override void Complete(Exception exception = null)
{
lock (_sync)
{
// We store the complete exception or s sentinel exception instance in a field if a flush was ongoing.
// We call the inner Complete() method after the flush loop ended.
// To simply ensure everything gets returned after the PipeWriter is left in some unknown state (say GetMemory() was
// called but not Advance(), or there's a flush pending), but you don't want to complete the inner pipe, just call Abort().
_completeException = exception ?? _successfullyCompletedSentinel;
if (_currentFlushTcs == null)
{
if (_bytesBuffered > 0)
{
CopyAndReturnSegmentsUnsynchronized();
}
CleanupSegmentsUnsynchronized();
_innerPipeWriter.Complete(exception);
}
}
}
public void Abort()
{
lock (_sync)
{
_aborted = true;
// If we're flushing, the cleanup will happen after the flush.
if (_currentFlushTcs == null)
{
CleanupSegmentsUnsynchronized();
}
}
}
private void CleanupSegmentsUnsynchronized()
{
BufferSegment segment = _head;
while (segment != null)
{
BufferSegment returnSegment = segment;
segment = segment.NextSegment;
returnSegment.ResetMemory();
}
_head = null;
_tail = null;
_tailMemory = null;
}
private void CopyAndReturnSegmentsUnsynchronized()
{
// Update any buffered data
_tail.End += _tailBytesBuffered;
_tailBytesBuffered = 0;
var segment = _head;
while (segment != null)
{
_innerPipeWriter.Write(segment.Memory.Span);
var returnSegment = segment;
segment = segment.NextSegment;
// We haven't reached the tail of the linked list yet, so we can always return the returnSegment.
if (segment != null)
{
returnSegment.ResetMemory();
ReturnSegmentUnsynchronized(returnSegment);
}
}
if (_bufferedWritePending)
{
// If an advance is pending, so is a flush, so the _tail segment should still get returned eventually.
_head = _tail;
}
else
{
_tail.ResetMemory();
ReturnSegmentUnsynchronized(_tail);
_head = _tail = null;
}
// Even if a non-passthrough call to Advance is pending, there a 0 bytes currently buffered.
_bytesBuffered = 0;
}
private void CompleteFlushUnsynchronized(FlushResult flushResult, Exception flushEx)
{
// Ensure all blocks are returned prior to the last call to FlushAsync() completing.
if (_completeException != null || _aborted)
{
CleanupSegmentsUnsynchronized();
}
if (ReferenceEquals(_completeException, _successfullyCompletedSentinel))
{
_innerPipeWriter.Complete();
}
else if (_completeException != null)
{
_innerPipeWriter.Complete(_completeException);
}
if (flushEx != null)
{
_currentFlushTcs.SetException(flushEx);
}
else
{
_currentFlushTcs.SetResult(flushResult);
}
_currentFlushTcs = null;
}
// The methods below were copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
private void AllocateMemoryUnsynchronized(int sizeHint)
{
_bufferedWritePending = true;
if (_head == null)
{
// We need to allocate memory to write since nobody has written before
BufferSegment newSegment = AllocateSegmentUnsynchronized(sizeHint);
// Set all the pointers
_head = _tail = newSegment;
_tailBytesBuffered = 0;
}
else
{
int bytesLeftInBuffer = _tailMemory.Length;
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
if (_tailBytesBuffered > 0)
{
// Flush buffered data to the segment
_tail.End += _tailBytesBuffered;
_tailBytesBuffered = 0;
}
BufferSegment newSegment = AllocateSegmentUnsynchronized(sizeHint);
_tail.SetNext(newSegment);
_tail = newSegment;
}
}
}
private BufferSegment AllocateSegmentUnsynchronized(int sizeHint)
{
BufferSegment newSegment = CreateSegmentUnsynchronized();
if (sizeHint <= _pool.MaxBufferSize)
{
// Use the specified pool if it fits
newSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, _pool.MaxBufferSize)));
}
else
{
// We can't use the pool so allocate an array
newSegment.SetUnownedMemory(new byte[sizeHint]);
}
_tailMemory = newSegment.AvailableMemory;
return newSegment;
}
private BufferSegment CreateSegmentUnsynchronized()
{
if (_bufferSegmentPool.TryPop(out BufferSegment segment))
{
return segment;
}
return new BufferSegment();
}
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
if (_bufferSegmentPool.Count < MaxSegmentPoolSize)
{
_bufferSegmentPool.Push(segment);
}
}
private static int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
{
// First we need to handle case where hint is smaller than minimum segment size
sizeHint = Math.Max(MinimumBufferSize, sizeHint);
// After that adjust it to fit into pools max buffer size
var adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
return adjustedToMaximumSize;
}
// Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.Memory/src/System/ThrowHelper.cs
private static void ThrowArgumentOutOfRangeException(string argumentName) { throw CreateArgumentOutOfRangeException(argumentName); }
[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateArgumentOutOfRangeException(string argumentName) { return new ArgumentOutOfRangeException(argumentName); }
}
}

View File

@ -9,7 +9,7 @@ using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers
{
/// <summary>
/// This wraps PipeWriter.FlushAsync() in a way that allows multiple awaiters making it safe to call from publicly
@ -21,15 +21,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
private readonly ITimeoutControl _timeoutControl;
private readonly IKestrelTrace _log;
private readonly object _flushLock = new object();
// This field should only be get or set under the _flushLock. This is a ValueTask that was either:
// 1. The default value where "IsCompleted" is true
// 2. Created by an async method
// 3. Constructed explicitely from a completed result
// This means it should be safe to await a single _lastFlushTask instance multiple times.
private ValueTask<FlushResult> _lastFlushTask;
public TimingPipeFlusher(
PipeWriter writer,
ITimeoutControl timeoutControl,
@ -56,25 +47,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
}
public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
// https://github.com/dotnet/corefxlab/issues/1334
// Pipelines don't support multiple awaiters on flush.
lock (_flushLock)
{
if (_lastFlushTask.IsCompleted)
{
_lastFlushTask = TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
}
else
{
_lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken);
}
return _lastFlushTask;
}
}
private ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
var pipeFlushTask = _writer.FlushAsync(cancellationToken);
@ -109,7 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
catch (Exception ex)
{
// A canceled token is the only reason flush should ever throw.
_log.LogError(0, ex, $"Unexpected exception in {nameof(TimingPipeFlusher)}.{nameof(TimeFlushAsync)}.");
_log.LogError(0, ex, $"Unexpected exception in {nameof(TimingPipeFlusher)}.{nameof(FlushAsync)}.");
}
finally
{
@ -123,11 +95,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
return default;
}
private async ValueTask<FlushResult> AwaitLastFlushAndTimeFlushAsync(ValueTask<FlushResult> lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
{
await lastFlushTask;
return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
}
}
}

View File

@ -110,10 +110,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate)
{
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions)
{
Log = _logger
};
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions);
certificateRequired = false;
}
else
@ -150,10 +147,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
}
return true;
}))
{
Log = _logger
};
}));
certificateRequired = true;
}

View File

@ -6,7 +6,6 @@ using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
@ -17,9 +16,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
private readonly Pipe _input;
private readonly Pipe _output;
private Task _inputTask;
private Task _outputTask;
private bool _disposed;
private readonly object _disposeLock = new object();
private readonly int _minAllocBufferSize;
@ -42,26 +39,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(),
useSynchronizationContext: false);
var outputOptions = new PipeOptions(pool: writerOptions.Pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
minimumSegmentSize: writerOptions.MinimumBufferSize,
useSynchronizationContext: false);
_minAllocBufferSize = writerOptions.MinimumBufferSize;
_input = new Pipe(inputOptions);
// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
// those patterns are fixed.
_output = new Pipe(outputOptions);
Output = PipeWriter.Create(Stream, writerOptions);
}
public ILogger Log { get; set; }
public TStream Stream { get; }
public PipeReader Input
@ -70,31 +53,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
if (_inputTask == null)
{
RunAsync();
_inputTask = ReadInputAsync();
}
return _input.Reader;
}
}
public PipeWriter Output
{
get
{
if (_outputTask == null)
{
RunAsync();
}
return _output.Writer;
}
}
public void RunAsync()
{
_inputTask = ReadInputAsync();
_outputTask = WriteOutputAsync();
}
public PipeWriter Output { get; }
public override async ValueTask DisposeAsync()
{
@ -107,27 +73,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_disposed = true;
}
_output.Writer.Complete();
_input.Reader.Complete();
Output.Complete();
if (_outputTask == null)
{
return;
}
if (_outputTask != null)
{
await _outputTask;
}
CancelPendingRead();
if (_inputTask != null)
{
await _inputTask;
}
}
protected override void Dispose(bool disposing)
{
throw new NotSupportedException();
}
private async Task ReadInputAsync()
{
Exception error = null;
@ -171,54 +132,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_input.Writer.Complete(error);
}
}
private async Task WriteOutputAsync()
{
try
{
while (true)
{
var result = await _output.Reader.ReadAsync();
var buffer = result.Buffer;
try
{
if (buffer.IsEmpty)
{
if (result.IsCompleted)
{
break;
}
await Stream.FlushAsync();
}
else if (buffer.IsSingleSegment)
{
await Stream.WriteAsync(buffer.First);
}
else
{
foreach (var memory in buffer)
{
await Stream.WriteAsync(memory);
}
}
}
finally
{
_output.Reader.AdvanceTo(buffer.End);
}
}
}
catch (Exception ex)
{
Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}");
}
finally
{
_output.Reader.Complete();
}
}
}
}

View File

@ -44,7 +44,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) :
base(transport, stream => new LoggingStream(stream, logger))
{
Log = logger;
}
}
}

View File

@ -0,0 +1,387 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class ConcurrentPipeWriterTests
{
[Fact]
public async Task PassthroughIfAllFlushesAreAwaited()
{
using (var slabPool = new SlabMemoryPool())
using (var diagnosticPool = new DiagnosticMemoryPool(slabPool))
{
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool);
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
pipeWriterFlushTcsArray[0].SetResult(default);
await flushTask0.DefaultTimeout();
memory = concurrentPipeWriter.GetMemory();
Assert.Equal(2, mockPipeWriter.GetMemoryCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(2, mockPipeWriter.AdvanceCallCount);
var flushTask1 = concurrentPipeWriter.FlushAsync();
Assert.Equal(2, mockPipeWriter.FlushCallCount);
pipeWriterFlushTcsArray[1].SetResult(default);
await flushTask1.DefaultTimeout();
var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx).DefaultTimeout();
Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
[Fact]
public async Task QueuesIfFlushIsNotAwaited()
{
using (var slabPool = new SlabMemoryPool())
using (var diagnosticPool = new DiagnosticMemoryPool(slabPool))
{
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool);
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
var flushTask1 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
pipeWriterFlushTcsArray[0].SetResult(default);
await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
// We do not need to flush the final bytes, since the incomplete flush will pick it up.
Assert.Equal(2, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(2, mockPipeWriter.AdvanceCallCount);
Assert.Equal(2, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
pipeWriterFlushTcsArray[1].SetResult(default);
await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
// Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times.
Assert.Equal(3, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(3, mockPipeWriter.AdvanceCallCount);
Assert.Equal(3, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);
// Complete isn't called on the inner PipeWriter until the inner flushes have completed.
Assert.Null(mockPipeWriter.CompleteException);
pipeWriterFlushTcsArray[2].SetResult(default);
await flushTask0.DefaultTimeout();
await flushTask1.DefaultTimeout();
Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
[Fact]
public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
{
using (var slabPool = new SlabMemoryPool())
using (var diagnosticPool = new DiagnosticMemoryPool(slabPool))
{
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool);
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
memory = concurrentPipeWriter.GetMemory();
// If the inner flush completes between a call to GetMemory() and Advance(), the outer
// flush completes, and the next flush will pick up the buffered data.
pipeWriterFlushTcsArray[0].SetResult(default);
await flushTask0.DefaultTimeout();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);
concurrentPipeWriter.Advance(memory.Length);
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
var flushTask1 = concurrentPipeWriter.FlushAsync();
// Now that we flushed the ConcurrentPipeWriter again, the GetMemory() and Advance() calls are replayed.
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
// it might take more or less calls to the inner PipeWriter's GetMemory method to copy all the data.
Assert.Equal(3, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(3, mockPipeWriter.AdvanceCallCount);
Assert.Equal(2, mockPipeWriter.FlushCallCount);
Assert.False(flushTask1.IsCompleted);
pipeWriterFlushTcsArray[1].SetResult(default);
await flushTask1.DefaultTimeout();
// Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times.
Assert.Equal(3, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(3, mockPipeWriter.AdvanceCallCount);
Assert.Equal(2, mockPipeWriter.FlushCallCount);
var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);
Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
[Fact]
public async Task CompleteFlushesQueuedBytes()
{
using (var slabPool = new SlabMemoryPool())
using (var diagnosticPool = new DiagnosticMemoryPool(slabPool))
{
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool);
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
memory = concurrentPipeWriter.GetMemory();
// If the inner flush completes between a call to GetMemory() and Advance(), the outer
// flush completes, and the next flush will pick up the buffered data.
pipeWriterFlushTcsArray[0].SetResult(default);
await flushTask0.DefaultTimeout();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);
concurrentPipeWriter.Advance(memory.Length);
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
// Complete the ConcurrentPipeWriter without flushing any of the queued data.
var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);
// Now that we completed the ConcurrentPipeWriter, the GetMemory() and Advance() calls are replayed.
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
// it might take more or less calls to the inner PipeWriter's GetMemory method to copy all the data.
Assert.Equal(3, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(3, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
[Fact]
public async Task CancelPendingFlushInterruptsFlushLoop()
{
using (var slabPool = new SlabMemoryPool())
using (var diagnosticPool = new DiagnosticMemoryPool(slabPool))
{
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool);
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
var flushTask1 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
// CancelPendingFlush() does not get queued.
concurrentPipeWriter.CancelPendingFlush();
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
pipeWriterFlushTcsArray[0].SetResult(new FlushResult(isCanceled: true, isCompleted: false));
Assert.True((await flushTask0.DefaultTimeout()).IsCanceled);
Assert.True((await flushTask1.DefaultTimeout()).IsCanceled);
var flushTask2 = concurrentPipeWriter.FlushAsync();
Assert.False(flushTask2.IsCompleted);
pipeWriterFlushTcsArray[1].SetResult(default);
await flushTask2.DefaultTimeout();
// We do not need to flush the final bytes, since the incomplete flush will pick it up.
Assert.Equal(2, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(2, mockPipeWriter.AdvanceCallCount);
Assert.Equal(2, mockPipeWriter.FlushCallCount);
var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);
Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
private class MockPipeWriter : PipeWriter
{
// It's important that this matches SlabMemoryPool._blockSize for all the tests to pass.
private const int SlabMemoryPoolBlockSize = 4096;
private readonly TaskCompletionSource<FlushResult>[] _flushResults;
public MockPipeWriter(TaskCompletionSource<FlushResult>[] flushResults)
{
_flushResults = flushResults;
}
public int GetMemoryCallCount { get; set; }
public int AdvanceCallCount { get; set; }
public int FlushCallCount { get; set; }
public int CancelPendingFlushCallCount { get; set; }
public TaskCompletionSource<object> FlushTcs { get; set; }
public Exception CompleteException { get; set; }
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
FlushCallCount++;
FlushTcs?.TrySetResult(null);
return new ValueTask<FlushResult>(_flushResults[FlushCallCount - 1].Task);
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
GetMemoryCallCount++;
return new Memory<byte>(new byte[sizeHint == 0 ? SlabMemoryPoolBlockSize : sizeHint]);
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
return GetMemory(sizeHint).Span;
}
public override void Advance(int bytes)
{
AdvanceCallCount++;
}
public override void Complete(Exception exception = null)
{
CompleteException = exception;
}
public override void CancelPendingFlush()
{
CancelPendingFlushCallCount++;
}
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
throw new NotImplementedException();
}
}
}
}

View File

@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
// Arrange
var pipe = new Pipe(new PipeOptions(_dirtyMemoryPool, PipeScheduler.Inline, PipeScheduler.Inline));
var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, new Mock<IKestrelTrace>().Object);
var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, _dirtyMemoryPool, new Mock<IKestrelTrace>().Object);
// Act
await frameWriter.WriteWindowUpdateAsync(1, 1);
@ -57,7 +57,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
// Arrange
var pipe = new Pipe(new PipeOptions(_dirtyMemoryPool, PipeScheduler.Inline, PipeScheduler.Inline));
var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, new Mock<IKestrelTrace>().Object);
var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, _dirtyMemoryPool, new Mock<IKestrelTrace>().Object);
// Act
await frameWriter.WriteGoAwayAsync(1, Http2ErrorCode.NO_ERROR);

View File

@ -1,67 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class TimingPipeFlusherTests
{
[Fact]
public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall()
{
var mockPipeWriter = new Mock<PipeWriter>();
var pipeWriterFlushTcsArray = new[] {
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};
var pipeWriterFlushCallCount = 0;
mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() =>
{
return new ValueTask<FlushResult>(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task);
});
var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null);
var flushTask0 = timingPipeFlusher.FlushAsync();
var flushTask1 = timingPipeFlusher.FlushAsync();
var flushTask2 = timingPipeFlusher.FlushAsync();
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
Assert.False(flushTask2.IsCompleted);
Assert.Equal(1, pipeWriterFlushCallCount);
pipeWriterFlushTcsArray[0].SetResult(default);
await flushTask0.AsTask().DefaultTimeout();
Assert.True(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
Assert.False(flushTask2.IsCompleted);
Assert.True(pipeWriterFlushCallCount <= 2);
pipeWriterFlushTcsArray[1].SetResult(default);
await flushTask1.AsTask().DefaultTimeout();
Assert.True(flushTask0.IsCompleted);
Assert.True(flushTask1.IsCompleted);
Assert.False(flushTask2.IsCompleted);
Assert.True(pipeWriterFlushCallCount <= 3);
pipeWriterFlushTcsArray[2].SetResult(default);
await flushTask2.AsTask().DefaultTimeout();
Assert.True(flushTask0.IsCompleted);
Assert.True(flushTask1.IsCompleted);
Assert.True(flushTask2.IsCompleted);
Assert.Equal(3, pipeWriterFlushCallCount);
}
}
}

View File

@ -400,6 +400,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
await _libuvThread.PostAsync(cb => cb(LibuvConstants.ECONNRESET.Value), triggerNextCompleted);
}
await task2Success.DefaultTimeout();
// Second task is now completed
Assert.True(task2Success.IsCompleted);
Assert.False(task2Success.IsCanceled);
@ -578,6 +580,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
await task1Waits.DefaultTimeout();
// First task is completed
Assert.True(task1Waits.IsCompleted);
Assert.False(task1Waits.IsCanceled);
@ -598,6 +602,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
}
await task3Success.DefaultTimeout();
Assert.True(task3Success.IsCompleted);
Assert.False(task3Success.IsCanceled);
Assert.False(task3Success.IsFaulted);;
@ -760,7 +766,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var processor = new LibuvOuputProcessor
{
ProcessingTask = outputTask,
OutputProducer = (Http1OutputProducer)http1Connection.Output
OutputProducer = (Http1OutputProducer)http1Connection.Output,
PipeWriter = pair.Transport.Output,
};
return processor;
@ -769,11 +776,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
private class LibuvOuputProcessor
{
public Http1OutputProducer OutputProducer { get; set; }
public PipeWriter PipeWriter { get; set; }
public Task ProcessingTask { get; set; }
public async ValueTask DisposeAsync()
{
OutputProducer.PipeWriter.Complete();
OutputProducer.Dispose();
PipeWriter.Complete();
await ProcessingTask;
}

View File

@ -12,7 +12,7 @@ namespace System.Threading.Tasks
return task.AsTask().TimeoutAfter(TestConstants.DefaultTimeout);
}
public static Task DefaultTimeout<T>(this ValueTask task)
public static Task DefaultTimeout(this ValueTask task)
{
return task.AsTask().TimeoutAfter(TestConstants.DefaultTimeout);
}

View File

@ -308,11 +308,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await WaitForConnectionErrorAsync<ConnectionAbortedException>(
ignoreNonGoAwayFrames: false,
expectedLastStreamId: int.MaxValue,
Http2ErrorCode.INTERNAL_ERROR,
null);
Assert.True((await _pair.Application.Input.ReadAsync().AsTask().DefaultTimeout()).IsCompleted);
_mockConnectionContext.Verify(c => c.Abort(It.Is<ConnectionAbortedException>(e =>
e.Message == CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied)), Times.Once);
@ -366,11 +362,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await WaitForConnectionErrorAsync<ConnectionAbortedException>(
ignoreNonGoAwayFrames: false,
expectedLastStreamId: int.MaxValue,
Http2ErrorCode.INTERNAL_ERROR,
null);
Assert.True((await _pair.Application.Input.ReadAsync().AsTask().DefaultTimeout()).IsCompleted);
_mockConnectionContext.Verify(c => c.Abort(It.Is<ConnectionAbortedException>(e =>
e.Message == CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied)), Times.Once);