Reuse Http2OutputProducer.ProcessDataWrites task (#19695)

This commit is contained in:
James Newton-King 2020-03-10 15:45:24 +13:00 committed by GitHub
parent 8ec31594ea
commit cd6e6ae0bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 184 additions and 69 deletions

View File

@ -55,7 +55,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private int _highestOpenedStreamId;
private bool _gracefulCloseStarted;
private readonly Dictionary<int, Http2Stream> _streams = new Dictionary<int, Http2Stream>();
private int _clientActiveStreamCount = 0;
private int _serverActiveStreamCount = 0;
@ -66,6 +65,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private int _isClosed;
// Internal for testing
internal readonly Dictionary<int, Http2Stream> _streams = new Dictionary<int, Http2Stream>();
internal Http2StreamStack StreamPool;
internal const int InitialStreamPoolSize = 5;
@ -304,6 +304,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
UpdateCompletedStreams();
}
while (StreamPool.TryPop(out var pooledStream))
{
pooledStream.Dispose();
}
// This cancels keep-alive and request header timeouts, but not the response drain timeout.
TimeoutControl.CancelTimeout();
TimeoutControl.StartDrainTimeout(Limits.MinResponseDataRate, Limits.MaxResponseBufferSize);
@ -909,6 +914,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
catch (Http2StreamErrorException)
{
_currentHeadersStream.Dispose();
ResetRequestHeaderParsingState();
throw;
}
@ -1069,11 +1075,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
throw new Http2ConnectionErrorException(CoreStrings.FormatHttp2ErrorStreamClosed(_incomingFrame.Type, _incomingFrame.StreamId), Http2ErrorCode.STREAM_CLOSED);
}
_streams.Remove(stream.StreamId);
if (stream.CanReuse)
{
ReturnStream(stream);
}
RemoveStream(stream);
}
else
{
@ -1087,6 +1089,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
private void RemoveStream(Http2Stream stream)
{
_streams.Remove(stream.StreamId);
if (stream.CanReuse)
{
ReturnStream(stream);
}
else
{
stream.Dispose();
}
}
// Compare to UpdateCompletedStreams, but only removes streams if over the max stream drain limit.
private void MakeSpaceInDrainQueue()
{
@ -1099,11 +1114,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_serverActiveStreamCount--;
}
_streams.Remove(stream.StreamId);
if (stream.CanReuse)
{
ReturnStream(stream);
}
RemoveStream(stream);
}
}
@ -1460,7 +1471,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
Unknown = 0x40000000
}
private static class GracefulCloseInitiator
{
public const int None = 0;

View File

@ -7,6 +7,7 @@ using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
@ -17,7 +18,7 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter
internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IValueTaskSource<FlushResult>, IDisposable
{
private int StreamId => _stream.StreamId;
private readonly Http2FrameWriter _frameWriter;
@ -33,14 +34,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly Pipe _pipe;
private readonly ConcurrentPipeWriter _pipeWriter;
private readonly PipeReader _pipeReader;
private ValueTask<FlushResult> _dataWriteProcessingTask;
private readonly ManualResetValueTaskSource<object> _resetAwaitable = new ManualResetValueTaskSource<object>();
private IMemoryOwner<byte> _fakeMemoryOwner;
private bool _startedWritingDataFrames;
private bool _streamCompleted;
private bool _suffixSent;
private bool _streamEnded;
private bool _writerComplete;
private bool _disposed;
private IMemoryOwner<byte> _fakeMemoryOwner;
// Internal for testing
internal ValueTask _dataWriteProcessingTask;
/// <summary>The core logic for the IValueTaskSource implementation.</summary>
private ManualResetValueTaskSourceCore<FlushResult> _responseCompleteTaskSource = new ManualResetValueTaskSourceCore<FlushResult> { RunContinuationsAsynchronously = true }; // mutable struct, do not make this readonly
// This object is itself usable as a backing source for ValueTask. Since there's only ever one awaiter
// for this object's state transitions at a time, we allow the object to be awaited directly. All functionality
// associated with the implementation is just delegated to the ManualResetValueTaskSourceCore.
private ValueTask<FlushResult> GetWaiterTask() => new ValueTask<FlushResult>(this, _responseCompleteTaskSource.Version);
ValueTaskSourceStatus IValueTaskSource<FlushResult>.GetStatus(short token) => _responseCompleteTaskSource.GetStatus(token);
void IValueTaskSource<FlushResult>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _responseCompleteTaskSource.OnCompleted(continuation, state, token, flags);
FlushResult IValueTaskSource<FlushResult>.GetResult(short token) => _responseCompleteTaskSource.GetResult(token);
public Http2OutputProducer(Http2Stream stream, Http2StreamContext context, StreamOutputFlowControl flowControl)
{
@ -64,7 +79,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public void StreamReset()
{
Debug.Assert(_dataWriteProcessingTask.IsCompletedSuccessfully);
// Data background task must still be running.
Debug.Assert(!_dataWriteProcessingTask.IsCompleted);
// Response should have been completed.
Debug.Assert(_responseCompleteTaskSource.GetStatus(_responseCompleteTaskSource.Version) == ValueTaskSourceStatus.Succeeded);
_streamEnded = false;
_suffixSent = false;
@ -75,8 +93,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_pipe.Reset();
_pipeWriter.Reset();
_responseCompleteTaskSource.Reset();
_dataWriteProcessingTask = ProcessDataWrites();
// Trigger the data process task to resume
_resetAwaitable.SetResult(null);
}
public void Complete()
@ -222,14 +242,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
if (_streamCompleted)
{
return _dataWriteProcessingTask;
return GetWaiterTask();
}
_streamCompleted = true;
_suffixSent = true;
_pipeWriter.Complete();
return _dataWriteProcessingTask;
return GetWaiterTask();
}
}
@ -370,68 +390,90 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
}
private async ValueTask<FlushResult> ProcessDataWrites()
private async ValueTask ProcessDataWrites()
{
FlushResult flushResult = default;
try
// ProcessDataWrites runs for the lifetime of the Http2OutputProducer, and is designed to be reused by multiple streams.
// When Http2OutputProducer is no longer used (e.g. a stream is aborted and will no longer be used, or the connection is closed)
// it should be disposed so ProcessDataWrites exits. Not disposing won't cause a memory leak in release builds, but in debug
// builds active tasks are rooted on Task.s_currentActiveTasks. Dispose could be removed in the future when active tasks are
// tracked by a weak reference. See https://github.com/dotnet/runtime/issues/26565
do
{
ReadResult readResult;
do
FlushResult flushResult = default;
ReadResult readResult = default;
try
{
readResult = await _pipeReader.ReadAsync();
if (readResult.IsCanceled)
do
{
// Response body is aborted, break and complete reader.
break;
}
else if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0)
{
// Output is ending and there are trailers to write
// Write any remaining content then write trailers
if (readResult.Buffer.Length > 0)
readResult = await _pipeReader.ReadAsync();
if (readResult.IsCanceled)
{
// Only flush if required (i.e. content length exceeds flow control availability)
// Writing remaining content without flushing allows content and trailers to be sent in the same packet
await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream: false, forceFlush: false);
// Response body is aborted, break and complete reader.
break;
}
_stream.ResponseTrailers.SetReadOnly();
_stream.DecrementActiveClientStreamCount();
flushResult = await _frameWriter.WriteResponseTrailers(StreamId, _stream.ResponseTrailers);
}
else if (readResult.IsCompleted && _streamEnded)
{
if (readResult.Buffer.Length != 0)
else if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0)
{
ThrowUnexpectedState();
}
// Output is ending and there are trailers to write
// Write any remaining content then write trailers
if (readResult.Buffer.Length > 0)
{
// Only flush if required (i.e. content length exceeds flow control availability)
// Writing remaining content without flushing allows content and trailers to be sent in the same packet
await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream: false, forceFlush: false);
}
// Headers have already been written and there is no other content to write
flushResult = await _frameWriter.FlushAsync(outputAborter: null, cancellationToken: default);
}
else
{
var endStream = readResult.IsCompleted;
if (endStream)
{
_stream.ResponseTrailers.SetReadOnly();
_stream.DecrementActiveClientStreamCount();
flushResult = await _frameWriter.WriteResponseTrailers(StreamId, _stream.ResponseTrailers);
}
flushResult = await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream, forceFlush: true);
}
else if (readResult.IsCompleted && _streamEnded)
{
if (readResult.Buffer.Length != 0)
{
ThrowUnexpectedState();
}
_pipeReader.AdvanceTo(readResult.Buffer.End);
} while (!readResult.IsCompleted);
}
catch (Exception ex)
{
_log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception.");
}
// Headers have already been written and there is no other content to write
flushResult = await _frameWriter.FlushAsync(outputAborter: null, cancellationToken: default);
}
else
{
var endStream = readResult.IsCompleted;
if (endStream)
{
_stream.DecrementActiveClientStreamCount();
}
flushResult = await _frameWriter.WriteDataAsync(StreamId, _flowControl, readResult.Buffer, endStream, forceFlush: true);
}
_pipeReader.Complete();
_pipeReader.AdvanceTo(readResult.Buffer.End);
} while (!readResult.IsCompleted);
}
catch (Exception ex)
{
_log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception.");
}
return flushResult;
_pipeReader.Complete();
// Signal via WriteStreamSuffixAsync to the stream that output has finished.
// Stream state will move to RequestProcessingStatus.ResponseCompleted
_responseCompleteTaskSource.SetResult(flushResult);
if (readResult.IsCompleted)
{
// Successfully read all data. Wait here for the stream to be reset.
await new ValueTask(_resetAwaitable, _resetAwaitable.Version);
_resetAwaitable.Reset();
}
else
{
// Stream was aborted.
break;
}
} while (!_disposed);
static void ThrowUnexpectedState()
{
@ -486,5 +528,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
useSynchronizationContext: false,
minimumSegmentSize: pool.GetMinimumSegmentSize()
));
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_resetAwaitable.SetResult(null);
}
}
}

View File

@ -17,7 +17,7 @@ using Microsoft.Net.Http.Headers;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
internal abstract partial class Http2Stream : HttpProtocol, IThreadPoolWorkItem
internal abstract partial class Http2Stream : HttpProtocol, IThreadPoolWorkItem, IDisposable
{
private Http2StreamContext _context;
private Http2OutputProducer _http2Output;
@ -588,6 +588,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
/// </summary>
public abstract void Execute();
public void Dispose()
{
_http2Output.Dispose();
}
[Flags]
private enum StreamCompletionFlags
{

View File

@ -76,12 +76,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[Fact]
public async Task StreamPool_SingleStream_ReturnedToPool()
{
await InitializeConnectionAsync(_echoApplication);
var serverTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await InitializeConnectionAsync(async context =>
{
await serverTcs.Task;
await _echoApplication(context);
});
Assert.Equal(0, _connection.StreamPool.Count);
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
var stream = _connection._streams[1];
serverTcs.SetResult(null);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)(Http2HeadersFrameFlags.END_HEADERS | Http2HeadersFrameFlags.END_STREAM),
@ -98,6 +107,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.Equal(1, _connection.StreamPool.Count);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
var output = (Http2OutputProducer)stream.Output;
await output._dataWriteProcessingTask;
}
[Fact]
@ -216,14 +228,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[QuarantinedTest]
public async Task StreamPool_StreamIsInvalidState_DontReturnedToPool()
{
var serverTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await InitializeConnectionAsync(async context =>
{
await serverTcs.Task;
await context.Response.WriteAsync("Content");
throw new InvalidOperationException("Put the stream into an invalid state by throwing after writing to response.");
});
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
var stream = _connection._streams[1];
serverTcs.SetResult(null);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 33,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
@ -244,6 +263,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
// Stream is not returned to the pool
Assert.Equal(0, _connection.StreamPool.Count);
var output = (Http2OutputProducer)stream.Output;
await output._dataWriteProcessingTask;
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
}

View File

@ -0,0 +1,25 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Threading.Tasks.Sources;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
internal sealed class ManualResetValueTaskSource<T> : IValueTaskSource<T>, IValueTaskSource
{
private ManualResetValueTaskSourceCore<T> _core; // mutable struct; do not make this readonly
public bool RunContinuationsAsynchronously { get => _core.RunContinuationsAsynchronously; set => _core.RunContinuationsAsynchronously = value; }
public short Version => _core.Version;
public void Reset() => _core.Reset();
public void SetResult(T result) => _core.SetResult(result);
public void SetException(Exception error) => _core.SetException(error);
public T GetResult(short token) => _core.GetResult(token);
void IValueTaskSource.GetResult(short token) => _core.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);
}
}