Reuse HTTP2 stream pipes to reduce allocations (#19356)

This commit is contained in:
James Newton-King 2020-03-06 08:15:31 +13:00 committed by GitHub
parent 557bcd9ec5
commit 0db0640e6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 200 additions and 89 deletions

View File

@ -26,6 +26,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
public bool IsAvailabilityLow => _flow.Available < _minWindowSizeIncrement;
public void Reset()
{
_flow = new FlowControl((uint)_initialWindowSize);
_pendingUpdateSize = 0;
_windowUpdatesDisabled = false;
}
public bool TryAdvance(int bytes)
{
lock (_flowLock)

View File

@ -37,6 +37,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
}
}
public void Reset(uint initialWindowSize)
{
// When output flow control is reused the client window size needs to be reset.
// The client might have changed the window size before the stream is reused.
_flow = new FlowControl(initialWindowSize);
Debug.Assert((_awaitableQueue?.Count ?? 0) == 0, "Queue should have been emptied by the previous stream.");
}
public void Advance(int bytes)
{
_flow.Advance(bytes);

View File

@ -10,11 +10,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
private readonly InputFlowControl _connectionLevelFlowControl;
private readonly InputFlowControl _streamLevelFlowControl;
private readonly int _streamId;
private int StreamId => _stream.StreamId;
private readonly Http2Stream _stream;
private readonly Http2FrameWriter _frameWriter;
public StreamInputFlowControl(
int streamId,
Http2Stream stream,
Http2FrameWriter frameWriter,
InputFlowControl connectionLevelFlowControl,
uint initialWindowSize,
@ -22,11 +23,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
_connectionLevelFlowControl = connectionLevelFlowControl;
_streamLevelFlowControl = new InputFlowControl(initialWindowSize, minWindowSizeIncrement);
_streamId = streamId;
_stream = stream;
_frameWriter = frameWriter;
}
public void Reset()
{
_streamLevelFlowControl.Reset();
}
public void Advance(int bytes)
{
var connectionSuccess = _connectionLevelFlowControl.TryAdvance(bytes);
@ -52,7 +57,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
if (streamWindowUpdateSize > 0)
{
// Writing with the FrameWriter should only fail if given a canceled token, so just fire and forget.
_ = _frameWriter.WriteWindowUpdateAsync(_streamId, streamWindowUpdateSize);
_ = _frameWriter.WriteWindowUpdateAsync(StreamId, streamWindowUpdateSize);
}
UpdateConnectionWindow(bytes);

View File

@ -24,6 +24,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
public bool IsAborted => _connectionLevelFlowControl.IsAborted || _streamLevelFlowControl.IsAborted;
public void Reset(uint initialWindowSize)
{
_streamLevelFlowControl.Reset(initialWindowSize);
if (_currentConnectionLevelAwaitable != null)
{
Debug.Assert(_currentConnectionLevelAwaitable.IsCompleted, "Should have been completed by the previous stream.");
_currentConnectionLevelAwaitable = null;
}
}
public void Advance(int bytes)
{
_connectionLevelFlowControl.Advance(bytes);

View File

@ -607,6 +607,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private void ReturnStream(Http2Stream stream)
{
// We're conservative about what streams we can reuse.
// If there is a chance the stream is still in use then don't attempt to reuse it.
Debug.Assert(stream.CanReuse);
if (StreamPool.Count < MaxStreamPoolSize)
{
StreamPool.Push(stream);
@ -1066,7 +1070,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
_streams.Remove(stream.StreamId);
ReturnStream(stream);
if (stream.CanReuse)
{
ReturnStream(stream);
}
}
else
{
@ -1093,7 +1100,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
_streams.Remove(stream.StreamId);
ReturnStream(stream);
if (stream.CanReuse)
{
ReturnStream(stream);
}
}
}

View File

@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter
{
private readonly int _streamId;
private int StreamId => _stream.StreamId;
private readonly Http2FrameWriter _frameWriter;
private readonly TimingPipeFlusher _flusher;
private readonly IKestrelTrace _log;
@ -30,53 +30,65 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly MemoryPool<byte> _memoryPool;
private readonly Http2Stream _stream;
private readonly object _dataWriterLock = new object();
private readonly PipeWriter _pipeWriter;
private readonly Pipe _pipe;
private readonly ConcurrentPipeWriter _pipeWriter;
private readonly PipeReader _pipeReader;
private readonly ValueTask<FlushResult> _dataWriteProcessingTask;
private ValueTask<FlushResult> _dataWriteProcessingTask;
private bool _startedWritingDataFrames;
private bool _completed;
private bool _streamCompleted;
private bool _suffixSent;
private bool _streamEnded;
private bool _disposed;
private bool _writerComplete;
private IMemoryOwner<byte> _fakeMemoryOwner;
public Http2OutputProducer(
int streamId,
Http2FrameWriter frameWriter,
StreamOutputFlowControl flowControl,
MemoryPool<byte> pool,
Http2Stream stream,
IKestrelTrace log)
public Http2OutputProducer(Http2Stream stream, Http2StreamContext context, StreamOutputFlowControl flowControl)
{
_streamId = streamId;
_frameWriter = frameWriter;
_flowControl = flowControl;
_memoryPool = pool;
_stream = stream;
_log = log;
_frameWriter = context.FrameWriter;
_flowControl = flowControl;
_memoryPool = context.MemoryPool;
_log = context.ServiceContext.Log;
var pipe = CreateDataPipe(pool);
_pipe = CreateDataPipe(_memoryPool);
_pipeWriter = new ConcurrentPipeWriter(pipe.Writer, pool, _dataWriterLock);
_pipeReader = pipe.Reader;
_pipeWriter = new ConcurrentPipeWriter(_pipe.Writer, _memoryPool, _dataWriterLock);
_pipeReader = _pipe.Reader;
// No need to pass in timeoutControl here, since no minDataRates are passed to the TimingPipeFlusher.
// The minimum output data rate is enforced at the connection level by Http2FrameWriter.
_flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, log);
_flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, _log);
_dataWriteProcessingTask = ProcessDataWrites();
}
public void Dispose()
public void StreamReset()
{
Debug.Assert(_dataWriteProcessingTask.IsCompletedSuccessfully);
_streamEnded = false;
_suffixSent = false;
_suffixSent = false;
_startedWritingDataFrames = false;
_streamCompleted = false;
_writerComplete = false;
_pipe.Reset();
_pipeWriter.Reset();
_dataWriteProcessingTask = ProcessDataWrites();
}
public void Complete()
{
lock (_dataWriterLock)
{
if (_disposed)
if (_writerComplete)
{
return;
}
_disposed = true;
_writerComplete = true;
Stop();
@ -107,9 +119,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
if (_completed)
if (_streamCompleted)
{
return default;
}
@ -133,14 +145,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
if (_completed)
if (_streamCompleted)
{
return default;
}
return _frameWriter.Write100ContinueAsync(_streamId);
return _frameWriter.Write100ContinueAsync(StreamId);
}
}
@ -150,7 +162,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
// The HPACK header compressor is stateful, if we compress headers for an aborted stream we must send them.
// Optimize for not compressing or sending them.
if (_completed)
if (_streamCompleted)
{
return;
}
@ -175,7 +187,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
http2HeadersFrame = Http2HeadersFrameFlags.NONE;
}
_frameWriter.WriteResponseHeaders(_streamId, statusCode, http2HeadersFrame, responseHeaders);
_frameWriter.WriteResponseHeaders(StreamId, statusCode, http2HeadersFrame, responseHeaders);
}
}
@ -188,11 +200,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
// This length check is important because we don't want to set _startedWritingDataFrames unless a data
// frame will actually be written causing the headers to be flushed.
if (_completed || data.Length == 0)
if (_streamCompleted || data.Length == 0)
{
return Task.CompletedTask;
}
@ -208,12 +220,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
if (_completed)
if (_streamCompleted)
{
return _dataWriteProcessingTask;
}
_completed = true;
_streamCompleted = true;
_suffixSent = true;
_pipeWriter.Complete();
@ -228,7 +240,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
// Always send the reset even if the response body is _completed. The request body may not have completed yet.
Stop();
return _frameWriter.WriteRstStreamAsync(_streamId, error);
return _frameWriter.WriteRstStreamAsync(StreamId, error);
}
}
@ -236,9 +248,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
if (_completed)
if (_streamCompleted)
{
return;
}
@ -253,9 +265,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
if (_completed)
if (_streamCompleted)
{
return GetFakeMemory(sizeHint).Span;
}
@ -268,9 +280,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
if (_completed)
if (_streamCompleted)
{
return GetFakeMemory(sizeHint);
}
@ -283,7 +295,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
if (_completed)
if (_streamCompleted)
{
return;
}
@ -301,11 +313,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_dataWriterLock)
{
ThrowIfSuffixSentOrDisposed();
ThrowIfSuffixSentOrCompleted();
// This length check is important because we don't want to set _startedWritingDataFrames unless a data
// frame will actually be written causing the headers to be flushed.
if (_completed || data.Length == 0)
if (_streamCompleted || data.Length == 0)
{
return default;
}
@ -341,12 +353,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_dataWriterLock)
{
if (_completed)
if (_streamCompleted)
{
return;
}
_completed = true;
_streamCompleted = true;
_pipeReader.CancelPendingRead();
@ -382,12 +394,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
// Only flush if required (i.e. content length exceeds flow control availability)
// Writing remaining content without flushing allows content and trailers to be sent in the same packet
await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: false, forceFlush: false);
await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream: false, forceFlush: false);
}
_stream.ResponseTrailers.SetReadOnly();
_stream.DecrementActiveClientStreamCount();
flushResult = await _frameWriter.WriteResponseTrailers(_streamId, _stream.ResponseTrailers);
flushResult = await _frameWriter.WriteResponseTrailers(StreamId, _stream.ResponseTrailers);
}
else if (readResult.IsCompleted && _streamEnded)
{
@ -406,7 +418,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
_stream.DecrementActiveClientStreamCount();
}
flushResult = await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream, forceFlush: true);
flushResult = await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream, forceFlush: true);
}
_pipeReader.AdvanceTo(readResult.Buffer.End);
@ -438,16 +450,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
[StackTraceHidden]
private void ThrowIfSuffixSentOrDisposed()
private void ThrowIfSuffixSentOrCompleted()
{
if (_suffixSent)
{
ThrowSuffixSent();
}
if (_disposed)
if (_writerComplete)
{
ThrowDisposed();
ThrowWriterComplete();
}
}
@ -458,7 +470,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
[StackTraceHidden]
private static void ThrowDisposed()
private static void ThrowWriterComplete()
{
throw new InvalidOperationException("Cannot write to response after the request has completed.");
}

View File

@ -37,6 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
base.Initialize(context);
CanReuse = false;
_decrementCalled = false;
_completionState = StreamCompletionFlags.None;
InputRemaining = null;
@ -45,27 +46,34 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_context = context;
_inputFlowControl = new StreamInputFlowControl(
context.StreamId,
context.FrameWriter,
context.ConnectionInputFlowControl,
context.ServerPeerSettings.InitialWindowSize,
context.ServerPeerSettings.InitialWindowSize / 2);
// First time the stream is used we need to create flow control, producer and pipes.
// When a stream is reused these types will be reset and reused.
if (_inputFlowControl == null)
{
_inputFlowControl = new StreamInputFlowControl(
this,
context.FrameWriter,
context.ConnectionInputFlowControl,
context.ServerPeerSettings.InitialWindowSize,
context.ServerPeerSettings.InitialWindowSize / 2);
_outputFlowControl = new StreamOutputFlowControl(
context.ConnectionOutputFlowControl,
context.ClientPeerSettings.InitialWindowSize);
_outputFlowControl = new StreamOutputFlowControl(
context.ConnectionOutputFlowControl,
context.ClientPeerSettings.InitialWindowSize);
_http2Output = new Http2OutputProducer(
context.StreamId,
context.FrameWriter,
_outputFlowControl,
context.MemoryPool,
this,
context.ServiceContext.Log);
_http2Output = new Http2OutputProducer(this, context, _outputFlowControl);
RequestBodyPipe = CreateRequestBodyPipe(context.ServerPeerSettings.InitialWindowSize);
Output = _http2Output;
RequestBodyPipe = CreateRequestBodyPipe();
Output = _http2Output;
}
else
{
_inputFlowControl.Reset();
_outputFlowControl.Reset(context.ClientPeerSettings.InitialWindowSize);
_http2Output.StreamReset();
RequestBodyPipe.Reset();
}
}
public void InitializeWithExistingContext(int streamId)
@ -95,6 +103,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
public bool CanReuse { get; private set; }
protected override void OnReset()
{
_keepAlive = true;
@ -137,13 +147,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
_http2Output.Dispose();
_http2Output.Complete();
RequestBodyPipe.Reader.Complete();
// The app can no longer read any more of the request body, so return any bytes that weren't read to the
// connection's flow-control window.
_inputFlowControl.Abort();
// We only want to reuse a stream that has completely finished writing.
// This is to prevent the situation where Http2OutputProducer.ProcessDataWrites
// is still running in the background.
CanReuse = !_keepAlive && HasResponseCompleted;
}
finally
{
@ -543,7 +558,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_context.StreamLifetimeHandler.DecrementActiveClientStreamCount();
}
private Pipe CreateRequestBodyPipe(uint windowSize)
private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
@ -551,8 +566,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
writerScheduler: PipeScheduler.Inline,
// Never pause within the window range. Flow control will prevent more data from being added.
// See the assert in OnDataAsync.
pauseWriterThreshold: windowSize + 1,
resumeWriterThreshold: windowSize + 1,
pauseWriterThreshold: _context.ServerPeerSettings.InitialWindowSize + 1,
resumeWriterThreshold: _context.ServerPeerSettings.InitialWindowSize + 1,
useSynchronizationContext: false,
minimumSegmentSize: _context.MemoryPool.GetMinimumSegmentSize()
));

View File

@ -3,6 +3,7 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Threading;
@ -59,6 +60,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeW
_sync = sync;
}
public void Reset()
{
Debug.Assert(_currentFlushTcs == null, "There should not be a pending flush.");
_aborted = false;
_completeException = null;
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
if (_currentFlushTcs == null && _head == null)

View File

@ -12,6 +12,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
@ -212,7 +213,41 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task StreamPool_EndedStreamErrorsOnStart_ReturnedToPool()
public async Task StreamPool_StreamIsInvalidState_DontReturnedToPool()
{
await InitializeConnectionAsync(async context =>
{
await context.Response.WriteAsync("Content");
throw new InvalidOperationException("Put the stream into an invalid state by throwing after writing to response.");
});
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 7,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await WaitForStreamErrorAsync(1, Http2ErrorCode.INTERNAL_ERROR, null);
// Ping will trigger the stream to be returned to the pool so we can assert it
await SendPingAsync(Http2PingFrameFlags.NONE);
await ExpectAsync(Http2FrameType.PING,
withLength: 8,
withFlags: (byte)Http2PingFrameFlags.ACK,
withStreamId: 0);
// Stream is not returned to the pool
Assert.Equal(0, _connection.StreamPool.Count);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
}
[Fact]
public async Task StreamPool_EndedStreamErrorsOnStart_NotReturnedToPool()
{
await InitializeConnectionAsync(_echoApplication);
@ -231,14 +266,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2PingFrameFlags.ACK,
withStreamId: 0);
// Stream returned to the pool
Assert.Equal(1, _connection.StreamPool.Count);
// Stream not returned to the pool
Assert.Equal(0, _connection.StreamPool.Count);
await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
}
[Fact]
public async Task StreamPool_UnendedStreamErrorsOnStart_ReturnedToPool()
public async Task StreamPool_UnendedStreamErrorsOnStart_NotReturnedToPool()
{
_serviceContext.ServerOptions.Limits.MinRequestBodyDataRate = null;
@ -268,8 +303,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2PingFrameFlags.ACK,
withStreamId: 0);
// Stream was returned to the pool because of the drain timeout
Assert.Equal(1, _connection.StreamPool.Count);
// Drain timeout has past but the stream was not returned because it is unfinished
Assert.Equal(0, _connection.StreamPool.Count);
await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);