From 5e4fae2573fc2e6be9bb708931fa12468cb43313 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Tue, 10 Jul 2018 13:02:46 -0700 Subject: [PATCH] Implement HTTP/2 output flow control (#2690) --- src/Kestrel.Core/CoreStrings.resx | 18 + .../Internal/Http/Http1OutputProducer.cs | 102 +- .../Internal/Http/HttpProtocol.cs | 6 +- .../Internal/Http/IHttpOutputProducer.cs | 4 +- .../Internal/Http2/Http2Connection.cs | 94 +- .../Internal/Http2/Http2Frame.WindowUpdate.cs | 2 +- src/Kestrel.Core/Internal/Http2/Http2Frame.cs | 4 +- .../Internal/Http2/Http2FrameWriter.cs | 223 +++-- .../Internal/Http2/Http2OutputFlowControl.cs | 77 ++ .../Http2/Http2OutputFlowControlAwaitable.cs | 48 + .../Internal/Http2/Http2OutputProducer.cs | 189 +++- .../Internal/Http2/Http2PeerSettings.cs | 5 +- ...tp2SettingsParameterOutOfRangeException.cs | 2 +- .../Internal/Http2/Http2Stream.cs | 25 +- .../Internal/Http2/Http2StreamContext.cs | 6 +- .../Http2/Http2StreamOutputFlowControl.cs | 96 ++ .../Internal/Http2/IHttp2FrameWriter.cs | 24 - .../Infrastructure/StreamSafePipeFlusher.cs | 80 ++ .../Infrastructure/ThreadPoolAwaitable.cs | 35 + .../Properties/CoreStrings.Designer.cs | 84 ++ .../Http2ConnectionTests.cs | 915 +++++++++++++++++- .../Http2/H2SpecTests.cs | 3 +- 22 files changed, 1795 insertions(+), 247 deletions(-) create mode 100644 src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs create mode 100644 src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs create mode 100644 src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs delete mode 100644 src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs create mode 100644 src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs create mode 100644 src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs diff --git a/src/Kestrel.Core/CoreStrings.resx b/src/Kestrel.Core/CoreStrings.resx index 0530d599a8..6b9ca5514d 100644 --- a/src/Kestrel.Core/CoreStrings.resx +++ b/src/Kestrel.Core/CoreStrings.resx @@ -524,4 +524,22 @@ For more information on configuring HTTPS see https://go.microsoft.com/fwlink/?l Invalid HTTP/2 connection preface. + + The connection or stream was aborted because a write operation was aborted with a CancellationToken. + + + The client sent a SETTINGS frame with a SETTINGS_INITIAL_WINDOW_SIZE that caused a flow-control window to exceed the maximum size. + + + The client sent a WINDOW_UPDATE frame that caused a flow-control window to exceed the maximum size. + + + The HTTP/2 connection faulted. + + + The client reset the request stream. + + + The request stream was aborted. + \ No newline at end of file diff --git a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs index 0fd62301bd..ce04650af2 100644 --- a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs @@ -4,12 +4,10 @@ using System; using System.Buffers; using System.IO.Pipelines; -using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; -using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; @@ -27,6 +25,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private readonly ITimeoutControl _timeoutControl; private readonly IKestrelTrace _log; private readonly IBytesWrittenFeature _transportBytesWrittenFeature; + private readonly StreamSafePipeFlusher _flusher; // This locks access to to all of the below fields private readonly object _contextLock = new object(); @@ -37,16 +36,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private long _totalBytesCommitted; private readonly PipeWriter _pipeWriter; - - // https://github.com/dotnet/corefxlab/issues/1334 - // Pipelines don't support multiple awaiters on flush - // this is temporary until it does - private TaskCompletionSource _flushTcs; - private readonly object _flushLock = new object(); - private Action _flushCompleted; - - private ValueTask _flushTask; - public Http1OutputProducer( PipeWriter pipeWriter, string connectionId, @@ -60,11 +49,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _connectionContext = connectionContext; _timeoutControl = timeoutControl; _log = log; - _flushCompleted = OnFlushCompleted; _transportBytesWrittenFeature = transportBytesWrittenFeature; + _flusher = new StreamSafePipeFlusher(pipeWriter, timeoutControl); } - public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default(CancellationToken)) + public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default) { if (cancellationToken.IsCancellationRequested) { @@ -74,12 +63,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http return WriteAsync(buffer, cancellationToken); } - public Task WriteStreamSuffixAsync(CancellationToken cancellationToken) + public Task WriteStreamSuffixAsync() { - return WriteAsync(_endChunkedResponseBytes.Span, cancellationToken); + return WriteAsync(_endChunkedResponseBytes.Span); } - public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) + public Task FlushAsync(CancellationToken cancellationToken = default) { return WriteAsync(Constants.EmptyData, cancellationToken); } @@ -191,17 +180,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } } - public Task Write100ContinueAsync(CancellationToken cancellationToken) + public Task Write100ContinueAsync() { - return WriteAsync(_continueBytes.Span, default(CancellationToken)); + return WriteAsync(_continueBytes.Span); } private Task WriteAsync( ReadOnlySpan buffer, - CancellationToken cancellationToken) + CancellationToken cancellationToken = default) { - var writableBuffer = default(PipeWriter); - long bytesWritten = 0; lock (_contextLock) { if (_completed) @@ -209,8 +196,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http return Task.CompletedTask; } - writableBuffer = _pipeWriter; - var writer = new CountingBufferWriter(writableBuffer); + var writer = new CountingBufferWriter(_pipeWriter); if (buffer.Length > 0) { writer.Write(buffer); @@ -220,74 +206,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } writer.Commit(); - bytesWritten = _unflushedBytes; + var bytesWritten = _unflushedBytes; _unflushedBytes = 0; - } - return FlushAsync(writableBuffer, bytesWritten, cancellationToken); - } - - // Single caller, at end of method - so inline - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private Task FlushAsync(PipeWriter writableBuffer, long bytesWritten, CancellationToken cancellationToken) - { - var awaitable = writableBuffer.FlushAsync(cancellationToken); - if (awaitable.IsCompleted) - { - // The flush task can't fail today - return Task.CompletedTask; - } - return FlushAsyncAwaited(awaitable, bytesWritten, cancellationToken); - } - - private async Task FlushAsyncAwaited(ValueTask awaitable, long count, CancellationToken cancellationToken) - { - // https://github.com/dotnet/corefxlab/issues/1334 - // Since the flush awaitable doesn't currently support multiple awaiters - // we need to use a task to track the callbacks. - // All awaiters get the same task - lock (_flushLock) - { - _flushTask = awaitable; - if (_flushTcs == null || _flushTcs.Task.IsCompleted) - { - _flushTcs = new TaskCompletionSource(); - - _flushTask.GetAwaiter().OnCompleted(_flushCompleted); - } - } - - _timeoutControl.StartTimingWrite(count); - try - { - await _flushTcs.Task; - cancellationToken.ThrowIfCancellationRequested(); - } - catch (OperationCanceledException) - { - _completed = true; - throw; - } - finally - { - _timeoutControl.StopTimingWrite(); - } - } - - private void OnFlushCompleted() - { - try - { - _flushTask.GetAwaiter().GetResult(); - _flushTcs.TrySetResult(null); - } - catch (Exception exception) - { - _flushTcs.TrySetResult(exception); - } - finally - { - _flushTask = default; + return _flusher.FlushAsync(bytesWritten, this, cancellationToken); } } } diff --git a/src/Kestrel.Core/Internal/Http/HttpProtocol.cs b/src/Kestrel.Core/Internal/Http/HttpProtocol.cs index 0c759c4451..e572f8b2c0 100644 --- a/src/Kestrel.Core/Internal/Http/HttpProtocol.cs +++ b/src/Kestrel.Core/Internal/Http/HttpProtocol.cs @@ -429,7 +429,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http /// /// Immediately kill the connection and poison the request and response streams with an error if there is one. /// - public void Abort(ConnectionAbortedException abortReason) + public virtual void Abort(ConnectionAbortedException abortReason) { if (Interlocked.Exchange(ref _requestAborted, 1) != 0) { @@ -966,7 +966,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http RequestHeaders.TryGetValue("Expect", out var expect) && (expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase)) { - Output.Write100ContinueAsync(default(CancellationToken)).GetAwaiter().GetResult(); + Output.Write100ContinueAsync().GetAwaiter().GetResult(); } } @@ -1097,7 +1097,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http // For the same reason we call CheckLastWrite() in Content-Length responses. _abortedCts = null; - await Output.WriteStreamSuffixAsync(default(CancellationToken)); + await Output.WriteStreamSuffixAsync(); if (_keepAlive) { diff --git a/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs b/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs index 41dfdbbbec..6dbdaca7f4 100644 --- a/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs @@ -14,12 +14,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http void Abort(ConnectionAbortedException abortReason); Task WriteAsync(Func callback, T state); Task FlushAsync(CancellationToken cancellationToken); - Task Write100ContinueAsync(CancellationToken cancellationToken); + Task Write100ContinueAsync(); void WriteResponseHeaders(int statusCode, string ReasonPhrase, HttpResponseHeaders responseHeaders); // The reason this is ReadOnlySpan and not ReadOnlyMemory is because writes are always // synchronous. Flushing to get back pressure is the only time we truly go async but // that's after the buffer is copied Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken); - Task WriteStreamSuffixAsync(CancellationToken cancellationToken); + Task WriteStreamSuffixAsync(); } } diff --git a/src/Kestrel.Core/Internal/Http2/Http2Connection.cs b/src/Kestrel.Core/Internal/Http2/Http2Connection.cs index 053388fb53..15c47b381c 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Connection.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Connection.cs @@ -61,6 +61,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly Http2ConnectionContext _context; private readonly Http2FrameWriter _frameWriter; private readonly HPackDecoder _hpackDecoder; + private readonly Http2OutputFlowControl _outputFlowControl = new Http2OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize); private readonly Http2PeerSettings _serverSettings = new Http2PeerSettings(); private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings(); @@ -80,7 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public Http2Connection(Http2ConnectionContext context) { _context = context; - _frameWriter = new Http2FrameWriter(context.Transport.Output, context.Application.Input); + _frameWriter = new Http2FrameWriter(context.Transport.Output, context.Application.Input, _outputFlowControl, this); _hpackDecoder = new HPackDecoder((int)_serverSettings.HeaderTableSize); } @@ -95,7 +96,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public void OnInputOrOutputCompleted() { _stopping = true; - _frameWriter.Abort(ex: null); + _frameWriter.Complete(); } public void Abort(ConnectionAbortedException ex) @@ -143,7 +144,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 if (Http2FrameReader.ReadFrame(readableBuffer, _incomingFrame, _serverSettings.MaxFrameSize, out consumed, out examined)) { Log.LogTrace($"Connection id {ConnectionId} received {_incomingFrame.Type} frame with flags 0x{_incomingFrame.Flags:x} and length {_incomingFrame.Length} for stream ID {_incomingFrame.StreamId}"); - await ProcessFrameAsync(application); + await ProcessFrameAsync(application); } } else if (result.IsCompleted) @@ -151,6 +152,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 return; } } + catch (Http2StreamErrorException ex) + { + Log.Http2StreamError(ConnectionId, ex); + AbortStream(_incomingFrame.StreamId, new ConnectionAbortedException(ex.Message, ex)); + await _frameWriter.WriteRstStreamAsync(ex.StreamId, ex.ErrorCode); + } finally { Input.AdvanceTo(consumed, examined); @@ -187,20 +194,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } finally { + var connectionError = error as ConnectionAbortedException + ?? new ConnectionAbortedException(CoreStrings.Http2ConnectionFaulted, error); + try { foreach (var stream in _streams.Values) { - stream.Http2Abort(error); + stream.Abort(connectionError); } await _frameWriter.WriteGoAwayAsync(_highestOpenedStreamId, errorCode); _frameWriter.Complete(); } + catch + { + _frameWriter.Abort(connectionError); + throw; + } finally { Input.Complete(); - _frameWriter.Abort(ex: null); } } } @@ -296,7 +310,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 case Http2FrameType.DATA: return ProcessDataFrameAsync(); case Http2FrameType.HEADERS: - return ProcessHeadersFrameAsync(application); + return ProcessHeadersFrameAsync(application); case Http2FrameType.PRIORITY: return ProcessPriorityFrameAsync(); case Http2FrameType.RST_STREAM: @@ -312,7 +326,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 case Http2FrameType.WINDOW_UPDATE: return ProcessWindowUpdateFrameAsync(); case Http2FrameType.CONTINUATION: - return ProcessContinuationFrameAsync(application); + return ProcessContinuationFrameAsync(application); default: return ProcessUnknownFrameAsync(); } @@ -442,7 +456,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 LocalEndPoint = _context.LocalEndPoint, RemoteEndPoint = _context.RemoteEndPoint, StreamLifetimeHandler = this, - FrameWriter = _frameWriter + ClientPeerSettings = _clientSettings, + FrameWriter = _frameWriter, + ConnectionOutputFlowControl = _outputFlowControl, + TimeoutControl = this, }); if ((_incomingFrame.HeadersFlags & Http2HeadersFrameFlags.END_STREAM) == Http2HeadersFrameFlags.END_STREAM) @@ -500,11 +517,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } ThrowIfIncomingFrameSentToIdleStream(); - - if (_streams.TryGetValue(_incomingFrame.StreamId, out var stream)) - { - stream.Abort(abortReason: null); - } + AbortStream(_incomingFrame.StreamId, new ConnectionAbortedException(CoreStrings.Http2StreamResetByClient)); return Task.CompletedTask; } @@ -533,7 +546,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 try { + // ParseFrame will not parse an InitialWindowSize > int.MaxValue. + var previousInitialWindowSize = (int)_clientSettings.InitialWindowSize; + _clientSettings.ParseFrame(_incomingFrame); + + // This difference can be negative. + var windowSizeDifference = (int)_clientSettings.InitialWindowSize - previousInitialWindowSize; + + if (windowSizeDifference != 0) + { + foreach (var stream in _streams.Values) + { + if (!stream.TryUpdateOutputWindow(windowSizeDifference)) + { + // This means that this caused a stream window to become larger than int.MaxValue. + // This can never happen with a well behaved client and MUST be treated as a connection error. + // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2 + throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorInitialWindowSizeInvalid, Http2ErrorCode.FLOW_CONTROL_ERROR); + } + } + } + return _frameWriter.WriteSettingsAckAsync(); } catch (Http2SettingsParameterOutOfRangeException ex) @@ -620,6 +654,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorWindowUpdateIncrementZero, Http2ErrorCode.PROTOCOL_ERROR); } + if (_incomingFrame.StreamId == 0) + { + if (!_frameWriter.TryUpdateConnectionWindow(_incomingFrame.WindowUpdateSizeIncrement)) + { + throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorWindowUpdateSizeInvalid, Http2ErrorCode.FLOW_CONTROL_ERROR); + } + } + else if (_streams.TryGetValue(_incomingFrame.StreamId, out var stream)) + { + if (!stream.TryUpdateOutputWindow(_incomingFrame.WindowUpdateSizeIncrement)) + { + throw new Http2StreamErrorException(_incomingFrame.StreamId, CoreStrings.Http2ErrorWindowUpdateSizeInvalid, Http2ErrorCode.FLOW_CONTROL_ERROR); + } + } + else + { + // The stream was not found in the dictionary which means the stream was probably closed. This can + // happen when the client sends a window update for a stream right as the server closes the same stream + // Since this is an unavoidable race, we just ignore the window update frame. + } + return Task.CompletedTask; } @@ -669,11 +724,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 ResetRequestHeaderParsingState(); } } - catch (Http2StreamErrorException ex) + catch (Http2StreamErrorException) { - Log.Http2StreamError(ConnectionId, ex); ResetRequestHeaderParsingState(); - return _frameWriter.WriteRstStreamAsync(ex.StreamId, ex.ErrorCode); + throw; } return Task.CompletedTask; @@ -745,6 +799,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } } + private void AbortStream(int streamId, ConnectionAbortedException error) + { + if (_streams.TryGetValue(streamId, out var stream)) + { + stream.Abort(error); + } + } + void IHttp2StreamLifetimeHandler.OnStreamCompleted(int streamId) { _streams.TryRemove(streamId, out _); diff --git a/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs b/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs index 6958b376f5..deb06ac02a 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs @@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { public int WindowUpdateSizeIncrement { - get => ((Payload[0] << 24) | (Payload[1] << 16) | (Payload[2] << 16) | Payload[3]) & 0x7fffffff; + get => ((Payload[0] << 24) | (Payload[1] << 16) | (Payload[2] << 8) | Payload[3]) & 0x7fffffff; set { Payload[0] = (byte)(((uint)value >> 24) & 0x7f); diff --git a/src/Kestrel.Core/Internal/Http2/Http2Frame.cs b/src/Kestrel.Core/Internal/Http2/Http2Frame.cs index 0d693c0ced..00e2968de1 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Frame.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Frame.cs @@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { public partial class Http2Frame { - public const int MinAllowedMaxFrameSize = 16 * 1024; + public const int MinAllowedMaxFrameSize = 16 * 1024; public const int MaxAllowedMaxFrameSize = 16 * 1024 * 1024 - 1; public const int HeaderLength = 9; @@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 get => _data[FlagsOffset]; set { - _data[FlagsOffset] = (byte)value; + _data[FlagsOffset] = value; } } diff --git a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs index df20e02638..f010264d78 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs @@ -7,13 +7,15 @@ using System.Collections.Generic; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { - public class Http2FrameWriter : IHttp2FrameWriter + public class Http2FrameWriter { // Literal Header Field without Indexing - Indexed Name (Index 8 - :status) private static readonly byte[] _continueBytes = new byte[] { 0x08, 0x03, (byte)'1', (byte)'0', (byte)'0' }; @@ -23,13 +25,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly HPackEncoder _hpackEncoder = new HPackEncoder(); private readonly PipeWriter _outputWriter; private readonly PipeReader _outputReader; + private readonly Http2OutputFlowControl _connectionOutputFlowControl; + private readonly StreamSafePipeFlusher _flusher; private bool _completed; - public Http2FrameWriter(PipeWriter outputPipeWriter, PipeReader outputPipeReader) + public Http2FrameWriter( + PipeWriter outputPipeWriter, + PipeReader outputPipeReader, + Http2OutputFlowControl connectionOutputFlowControl, + ITimeoutControl timeoutControl) { _outputWriter = outputPipeWriter; _outputReader = outputPipeReader; + + _connectionOutputFlowControl = connectionOutputFlowControl; + _flusher = new StreamSafePipeFlusher(_outputWriter, timeoutControl); } public void Complete() @@ -42,30 +53,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } _completed = true; + _connectionOutputFlowControl.Abort(); _outputWriter.Complete(); } } - public void Abort(Exception ex) + public void Abort(ConnectionAbortedException ex) + { + // TODO: Really abort the connection using the ConnectionContex like Http1OutputProducer. + _outputReader.CancelPendingRead(); + Complete(); + } + + public Task FlushAsync(IHttpOutputProducer outputProducer, CancellationToken cancellationToken) { lock (_writeLock) { if (_completed) { - return; + return Task.CompletedTask; } - _completed = true; - _outputReader.CancelPendingRead(); - _outputWriter.Complete(ex); - } - } - - public Task FlushAsync(CancellationToken cancellationToken) - { - lock (_writeLock) - { - return WriteAsync(Constants.EmptyData); + return _flusher.FlushAsync(0, outputProducer, cancellationToken); } } @@ -77,7 +86,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _outgoingFrame.Length = _continueBytes.Length; _continueBytes.CopyTo(_outgoingFrame.HeadersPayload); - return WriteAsync(_outgoingFrame.Raw); + return WriteUnsynchronizedAsync(_outgoingFrame.Raw); } } @@ -85,6 +94,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { lock (_writeLock) { + if (_completed) + { + return; + } + _outgoingFrame.PrepareHeaders(Http2HeadersFrameFlags.NONE, streamId); var done = _hpackEncoder.BeginEncode(statusCode, EnumerateHeaders(headers), _outgoingFrame.Payload, out var payloadLength); @@ -95,7 +109,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _outgoingFrame.HeadersFlags = Http2HeadersFrameFlags.END_HEADERS; } - Append(_outgoingFrame.Raw); + _outputWriter.Write(_outgoingFrame.Raw); while (!done) { @@ -109,51 +123,128 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _outgoingFrame.ContinuationFlags = Http2ContinuationFrameFlags.END_HEADERS; } - Append(_outgoingFrame.Raw); + _outputWriter.Write(_outgoingFrame.Raw); } } } - public Task WriteDataAsync(int streamId, ReadOnlySpan data, CancellationToken cancellationToken) - => WriteDataAsync(streamId, data, endStream: false, cancellationToken: cancellationToken); - - public Task WriteDataAsync(int streamId, ReadOnlySpan data, bool endStream, CancellationToken cancellationToken) + public Task WriteDataAsync(int streamId, Http2StreamOutputFlowControl flowControl, ReadOnlySequence data, bool endStream) { - var tasks = new List(); + // The Length property of a ReadOnlySequence can be expensive, so we cache the value. + var dataLength = data.Length; lock (_writeLock) { - _outgoingFrame.PrepareData(streamId); - - while (data.Length > _outgoingFrame.Length) + if (_completed || flowControl.IsAborted) { - data.Slice(0, _outgoingFrame.Length).CopyTo(_outgoingFrame.Payload); - data = data.Slice(_outgoingFrame.Length); - - tasks.Add(WriteAsync(_outgoingFrame.Raw, cancellationToken)); + return Task.CompletedTask; } - _outgoingFrame.Length = data.Length; - - if (endStream) + // Zero-length data frames are allowed to be sent immediately even if there is no space available in the flow control window. + // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 + if (dataLength != 0 && dataLength > flowControl.Available) { - _outgoingFrame.DataFlags = Http2DataFrameFlags.END_STREAM; + return WriteDataAsyncAwaited(streamId, flowControl, data, dataLength, endStream); } - data.CopyTo(_outgoingFrame.Payload); - - tasks.Add(WriteAsync(_outgoingFrame.Raw, cancellationToken)); - - return Task.WhenAll(tasks); + // This cast is safe since if dataLength would overflow an int, it's guaranteed to be greater than the available flow control window. + flowControl.Advance((int)dataLength); + return WriteDataUnsynchronizedAsync(streamId, data, endStream); } } + private Task WriteDataUnsynchronizedAsync(int streamId, ReadOnlySequence data, bool endStream) + { + _outgoingFrame.PrepareData(streamId); + + var payload = _outgoingFrame.Payload; + var unwrittenPayloadLength = 0; + + foreach (var buffer in data) + { + var current = buffer; + + while (current.Length > payload.Length) + { + current.Span.Slice(0, payload.Length).CopyTo(payload); + current = current.Slice(payload.Length); + + _outputWriter.Write(_outgoingFrame.Raw); + payload = _outgoingFrame.Payload; + unwrittenPayloadLength = 0; + } + + if (current.Length > 0) + { + current.Span.CopyTo(payload); + payload = payload.Slice(current.Length); + unwrittenPayloadLength += current.Length; + } + } + + if (endStream) + { + _outgoingFrame.DataFlags = Http2DataFrameFlags.END_STREAM; + } + + _outgoingFrame.Length = unwrittenPayloadLength; + _outputWriter.Write(_outgoingFrame.Raw); + + return FlushUnsynchronizedAsync(); + } + + private async Task WriteDataAsyncAwaited(int streamId, Http2StreamOutputFlowControl flowControl, ReadOnlySequence data, long dataLength, bool endStream) + { + while (dataLength > 0) + { + Http2OutputFlowControlAwaitable availabilityAwaitable; + var writeTask = Task.CompletedTask; + + lock (_writeLock) + { + if (_completed || flowControl.IsAborted) + { + break; + } + + var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable); + + if (actual > 0) + { + if (actual < dataLength) + { + writeTask = WriteDataUnsynchronizedAsync(streamId, data.Slice(0, actual), endStream: false); + data = data.Slice(actual); + dataLength -= actual; + } + else + { + writeTask = WriteDataUnsynchronizedAsync(streamId, data, endStream); + dataLength = 0; + } + } + } + + // This awaitable releases continuations in FIFO order when the window updates. + // It should be very rare for a continuation to run without any availability. + if (availabilityAwaitable != null) + { + await availabilityAwaitable; + } + + await writeTask; + } + + // Ensure that the application continuation isn't executed inline by ProcessWindowUpdateFrameAsync. + await ThreadPoolAwaitable.Instance; + } + public Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode) { lock (_writeLock) { _outgoingFrame.PrepareRstStream(streamId, errorCode); - return WriteAsync(_outgoingFrame.Raw); + return WriteUnsynchronizedAsync(_outgoingFrame.Raw); } } @@ -163,7 +254,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { // TODO: actually send settings _outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.NONE); - return WriteAsync(_outgoingFrame.Raw); + return WriteUnsynchronizedAsync(_outgoingFrame.Raw); } } @@ -172,7 +263,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 lock (_writeLock) { _outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.ACK); - return WriteAsync(_outgoingFrame.Raw); + return WriteUnsynchronizedAsync(_outgoingFrame.Raw); } } @@ -182,7 +273,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { _outgoingFrame.PreparePing(Http2PingFrameFlags.ACK); payload.CopyTo(_outgoingFrame.Payload); - return WriteAsync(_outgoingFrame.Raw); + return WriteUnsynchronizedAsync(_outgoingFrame.Raw); } } @@ -191,23 +282,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 lock (_writeLock) { _outgoingFrame.PrepareGoAway(lastStreamId, errorCode); - return WriteAsync(_outgoingFrame.Raw); + return WriteUnsynchronizedAsync(_outgoingFrame.Raw); } } - // Must be called with _writeLock - private void Append(ReadOnlySpan data) - { - if (_completed) - { - return; - } - - _outputWriter.Write(data); - } - - // Must be called with _writeLock - private Task WriteAsync(ReadOnlySpan data, CancellationToken cancellationToken = default(CancellationToken)) + private Task WriteUnsynchronizedAsync(ReadOnlySpan data) { if (_completed) { @@ -215,12 +294,36 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } _outputWriter.Write(data); - return FlushAsync(_outputWriter, cancellationToken); + return FlushUnsynchronizedAsync(); } - private async Task FlushAsync(PipeWriter outputWriter, CancellationToken cancellationToken) + private Task FlushUnsynchronizedAsync() { - await outputWriter.FlushAsync(cancellationToken); + return _flusher.FlushAsync(); + } + + public bool TryUpdateConnectionWindow(int bytes) + { + lock (_writeLock) + { + return _connectionOutputFlowControl.TryUpdateWindow(bytes); + } + } + + public bool TryUpdateStreamWindow(Http2StreamOutputFlowControl flowControl, int bytes) + { + lock (_writeLock) + { + return flowControl.TryUpdateWindow(bytes); + } + } + + public void AbortPendingStreamDataWrites(Http2StreamOutputFlowControl flowControl) + { + lock (_writeLock) + { + flowControl.Abort(); + } } private static IEnumerable> EnumerateHeaders(IHeaderDictionary headers) diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs new file mode 100644 index 0000000000..4e697cb103 --- /dev/null +++ b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs @@ -0,0 +1,77 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Diagnostics; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 +{ + public class Http2OutputFlowControl + { + private readonly Queue _awaitableQueue = new Queue(); + + public Http2OutputFlowControl(uint initialWindowSize) + { + Debug.Assert(initialWindowSize <= Http2PeerSettings.MaxWindowSize, $"{nameof(initialWindowSize)} too large."); + + Available = (int)initialWindowSize; + } + + public int Available { get; private set; } + public bool IsAborted { get; private set; } + + public Http2OutputFlowControlAwaitable AvailabilityAwaitable + { + get + { + Debug.Assert(!IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort."); + Debug.Assert(Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available."); + + var awaitable = new Http2OutputFlowControlAwaitable(); + _awaitableQueue.Enqueue(awaitable); + return awaitable; + } + } + + public void Advance(int bytes) + { + Debug.Assert(!IsAborted, $"({nameof(Advance)} called after abort."); + Debug.Assert(bytes == 0 || (bytes > 0 && bytes <= Available), $"{nameof(Advance)}({bytes}) called with {Available} bytes available."); + + Available -= bytes; + } + + // bytes can be negative when SETTINGS_INITIAL_WINDOW_SIZE decreases mid-connection. + // This can also cause Available to become negative which MUST be allowed. + // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2 + public bool TryUpdateWindow(int bytes) + { + var maxUpdate = Http2PeerSettings.MaxWindowSize - Available; + + if (bytes > maxUpdate) + { + return false; + } + + Available += bytes; + + while (Available > 0 && _awaitableQueue.Count > 0) + { + var awaitable = _awaitableQueue.Dequeue(); + awaitable.Complete(); + } + + return true; + } + + public void Abort() + { + IsAborted = true; + + while (_awaitableQueue.Count > 0) + { + _awaitableQueue.Dequeue().Complete(); + } + } + } +} diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs new file mode 100644 index 0000000000..a83a0f9b09 --- /dev/null +++ b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs @@ -0,0 +1,48 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 +{ + public class Http2OutputFlowControlAwaitable : ICriticalNotifyCompletion + { + private static readonly Action _callbackCompleted = () => { }; + + private Action _callback; + + public Http2OutputFlowControlAwaitable GetAwaiter() => this; + public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); + + public void GetResult() + { + Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); + + _callback = null; + } + + public void OnCompleted(Action continuation) + { + if (ReferenceEquals(_callback, _callbackCompleted) || + ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) + { + continuation(); + } + } + + public void UnsafeOnCompleted(Action continuation) + { + OnCompleted(continuation); + } + + public void Complete() + { + var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); + + continuation?.Invoke(); + } + } +} diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs index e701654d1e..03babbedeb 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs @@ -2,33 +2,79 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; +using System.Diagnostics; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { public class Http2OutputProducer : IHttpOutputProducer { private readonly int _streamId; - private readonly IHttp2FrameWriter _frameWriter; + private readonly Http2FrameWriter _frameWriter; + private readonly StreamSafePipeFlusher _flusher; - public Http2OutputProducer(int streamId, IHttp2FrameWriter frameWriter) + // This should only be accessed via the FrameWriter. The connection-level output flow control is protected by the + // FrameWriter's connection-level write lock. + private readonly Http2StreamOutputFlowControl _flowControl; + + private readonly object _dataWriterLock = new object(); + private readonly Pipe _dataPipe; + private readonly Task _dataWriteProcessingTask; + private bool _startedWritingDataFrames; + private bool _completed; + private bool _disposed; + + public Http2OutputProducer( + int streamId, + Http2FrameWriter frameWriter, + Http2StreamOutputFlowControl flowControl, + ITimeoutControl timeoutControl, + MemoryPool pool) { _streamId = streamId; _frameWriter = frameWriter; + _flowControl = flowControl; + _dataPipe = CreateDataPipe(pool); + _flusher = new StreamSafePipeFlusher(_dataPipe.Writer, timeoutControl); + _dataWriteProcessingTask = ProcessDataWrites(); } public void Dispose() { + lock (_dataWriterLock) + { + if (_disposed) + { + return; + } + + _disposed = true; + + if (!_completed) + { + _completed = true; + + // Complete with an exception to prevent an end of stream data frame from being sent without an + // explicit call to WriteStreamSuffixAsync. ConnectionAbortedExceptions are swallowed, so the + // message doesn't matter + _dataPipe.Writer.Complete(new ConnectionAbortedException()); + } + + _frameWriter.AbortPendingStreamDataWrites(_flowControl); + } } - public void Abort(ConnectionAbortedException error) + public void Abort(ConnectionAbortedException abortReason) { // TODO: RST_STREAM? + Dispose(); } public Task WriteAsync(Func callback, T state) @@ -36,23 +82,142 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 throw new NotImplementedException(); } - public Task FlushAsync(CancellationToken cancellationToken) => _frameWriter.FlushAsync(cancellationToken); - - public Task Write100ContinueAsync(CancellationToken cancellationToken) => _frameWriter.Write100ContinueAsync(_streamId); - - public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken) + public Task FlushAsync(CancellationToken cancellationToken) { - return _frameWriter.WriteDataAsync(_streamId, data, cancellationToken); + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + lock (_dataWriterLock) + { + if (_completed) + { + return Task.CompletedTask; + } + + if (_startedWritingDataFrames) + { + // If there's already been response data written to the stream, just wait for that. Any header + // should be in front of the data frames in the connection pipe. Trailers could change things. + return _flusher.FlushAsync(0, this, cancellationToken); + } + else + { + // Flushing the connection pipe ensures headers already in the pipe are flushed even if no data + // frames have been written. + return _frameWriter.FlushAsync(this, cancellationToken); + } + } } - public Task WriteStreamSuffixAsync(CancellationToken cancellationToken) + public Task Write100ContinueAsync() { - return _frameWriter.WriteDataAsync(_streamId, Constants.EmptyData, endStream: true, cancellationToken: cancellationToken); + lock (_dataWriterLock) + { + if (_completed) + { + return Task.CompletedTask; + } + + return _frameWriter.Write100ContinueAsync(_streamId); + } } public void WriteResponseHeaders(int statusCode, string ReasonPhrase, HttpResponseHeaders responseHeaders) { - _frameWriter.WriteResponseHeaders(_streamId, statusCode, responseHeaders); + lock (_dataWriterLock) + { + if (_completed) + { + return; + } + + // The HPACK header compressor is stateful, if we compress headers for an aborted stream we must send them. + // Optimize for not compressing or sending them. + _frameWriter.WriteResponseHeaders(_streamId, statusCode, responseHeaders); + } } + + public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + lock (_dataWriterLock) + { + // This length check is important because we don't want to set _startedWritingDataFrames unless a data + // frame will actually be written causing the headers to be flushed. + if (_completed || data.Length == 0) + { + return Task.CompletedTask; + } + + _startedWritingDataFrames = true; + + _dataPipe.Writer.Write(data); + return _flusher.FlushAsync(data.Length, this, cancellationToken); + } + } + + public Task WriteStreamSuffixAsync() + { + lock (_dataWriterLock) + { + if (_completed) + { + return Task.CompletedTask; + } + + _completed = true; + + // Even if there's no actual data, completing the writer gracefully sends an END_STREAM DATA frame. + _startedWritingDataFrames = true; + + _dataPipe.Writer.Complete(); + return _dataWriteProcessingTask; + } + } + + private async Task ProcessDataWrites() + { + try + { + ReadResult readResult; + + do + { + readResult = await _dataPipe.Reader.ReadAsync(); + + await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted); + + _dataPipe.Reader.AdvanceTo(readResult.Buffer.End); + } while (!readResult.IsCompleted); + } + catch (ConnectionAbortedException) + { + // Writes should not throw for aborted connections. + } + catch (Exception ex) + { + Debug.Assert(false, ex.ToString()); + } + + _dataPipe.Reader.Complete(); + } + + private static Pipe CreateDataPipe(MemoryPool pool) + => new Pipe(new PipeOptions + ( + pool: pool, + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + useSynchronizationContext: false, + minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize + )); } } diff --git a/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs b/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs index 4bf4787435..fcf78c4b42 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs @@ -14,6 +14,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public const uint DefaultInitialWindowSize = 65535; public const uint DefaultMaxFrameSize = 16384; public const uint DefaultMaxHeaderListSize = uint.MaxValue; + public const uint MaxWindowSize = int.MaxValue; public uint HeaderTableSize { get; set; } = DefaultHeaderTableSize; @@ -59,11 +60,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 MaxConcurrentStreams = value; break; case Http2SettingsParameter.SETTINGS_INITIAL_WINDOW_SIZE: - if (value > int.MaxValue) + if (value > MaxWindowSize) { throw new Http2SettingsParameterOutOfRangeException(Http2SettingsParameter.SETTINGS_INITIAL_WINDOW_SIZE, lowerBound: 0, - upperBound: int.MaxValue); + upperBound: MaxWindowSize); } InitialWindowSize = value; diff --git a/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs b/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs index 95db1c9d58..fcaca923b0 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs @@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { public class Http2SettingsParameterOutOfRangeException : Exception { - public Http2SettingsParameterOutOfRangeException(Http2SettingsParameter parameter, uint lowerBound, uint upperBound) + public Http2SettingsParameterOutOfRangeException(Http2SettingsParameter parameter, long lowerBound, long upperBound) : base($"HTTP/2 SETTINGS parameter {parameter} must be set to a value between {lowerBound} and {upperBound}") { Parameter = parameter; diff --git a/src/Kestrel.Core/Internal/Http2/Http2Stream.cs b/src/Kestrel.Core/Internal/Http2/Http2Stream.cs index 702cd636d5..d47a78be13 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Stream.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Stream.cs @@ -3,13 +3,12 @@ using System; using System.Buffers; +using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; -using Microsoft.AspNetCore.Http.Features; -using Microsoft.AspNetCore.Server.Kestrel.Core.Features; +using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; -using Microsoft.Extensions.Primitives; using Microsoft.Net.Http.Headers; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 @@ -17,13 +16,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public partial class Http2Stream : HttpProtocol { private readonly Http2StreamContext _context; + private readonly Http2StreamOutputFlowControl _outputFlowControl; public Http2Stream(Http2StreamContext context) : base(context) { _context = context; + _outputFlowControl = new Http2StreamOutputFlowControl(context.ConnectionOutputFlowControl, context.ClientPeerSettings.InitialWindowSize); - Output = new Http2OutputProducer(StreamId, _context.FrameWriter); + Output = new Http2OutputProducer(context.StreamId, context.FrameWriter, _outputFlowControl, context.TimeoutControl, context.MemoryPool); } public int StreamId => _context.StreamId; @@ -143,15 +144,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } } - // TODO: The HTTP/2 tests expect the request and response streams to be aborted with - // non-ConnectionAbortedExceptions. The abortReasons can include things like - // Http2ConnectionErrorException which don't derive from IOException or - // OperationCanceledException. This is probably not a good idea. - public void Http2Abort(Exception abortReason) + public override void Abort(ConnectionAbortedException abortReason) { - _streams?.Abort(abortReason); + base.Abort(abortReason); - OnInputOrOutputCompleted(); + // Unblock the request body. + RequestBodyPipe.Writer.Complete(new IOException(CoreStrings.Http2StreamAborted, abortReason)); + } + + public bool TryUpdateOutputWindow(int bytes) + { + return _context.FrameWriter.TryUpdateStreamWindow(_outputFlowControl, bytes); } } } diff --git a/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs b/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs index eea80103a7..d8d24144a2 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs @@ -5,6 +5,7 @@ using System.Buffers; using System.Net; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { @@ -18,6 +19,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public IPEndPoint RemoteEndPoint { get; set; } public IPEndPoint LocalEndPoint { get; set; } public IHttp2StreamLifetimeHandler StreamLifetimeHandler { get; set; } - public IHttp2FrameWriter FrameWriter { get; set; } + public Http2PeerSettings ClientPeerSettings { get; set; } + public Http2FrameWriter FrameWriter { get; set; } + public Http2OutputFlowControl ConnectionOutputFlowControl { get; set; } + public ITimeoutControl TimeoutControl { get; set; } } } diff --git a/src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs b/src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs new file mode 100644 index 0000000000..677d821379 --- /dev/null +++ b/src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs @@ -0,0 +1,96 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 +{ + public class Http2StreamOutputFlowControl + { + private readonly Http2OutputFlowControl _connectionLevelFlowControl; + private readonly Http2OutputFlowControl _streamLevelFlowControl; + + private Http2OutputFlowControlAwaitable _currentConnectionLevelAwaitable; + + public Http2StreamOutputFlowControl(Http2OutputFlowControl connectionLevelFlowControl, uint initialWindowSize) + { + _connectionLevelFlowControl = connectionLevelFlowControl; + _streamLevelFlowControl = new Http2OutputFlowControl(initialWindowSize); + } + + public int Available => Math.Min(_connectionLevelFlowControl.Available, _streamLevelFlowControl.Available); + + public bool IsAborted => _connectionLevelFlowControl.IsAborted || _streamLevelFlowControl.IsAborted; + + public void Advance(int bytes) + { + _connectionLevelFlowControl.Advance(bytes); + _streamLevelFlowControl.Advance(bytes); + } + + public int AdvanceUpToAndWait(long bytes, out Http2OutputFlowControlAwaitable awaitable) + { + var leastAvailableFlow = _connectionLevelFlowControl.Available < _streamLevelFlowControl.Available + ? _connectionLevelFlowControl : _streamLevelFlowControl; + + // Clamp ~= Math.Clamp from netcoreapp >= 2.0 + var actual = Clamp(leastAvailableFlow.Available, 0, bytes); + + // Make sure to advance prior to accessing AvailabilityAwaitable. + _connectionLevelFlowControl.Advance(actual); + _streamLevelFlowControl.Advance(actual); + + awaitable = null; + _currentConnectionLevelAwaitable = null; + + if (actual < bytes) + { + awaitable = leastAvailableFlow.AvailabilityAwaitable; + + if (leastAvailableFlow == _connectionLevelFlowControl) + { + _currentConnectionLevelAwaitable = awaitable; + } + } + + return actual; + } + + // The connection-level update window is updated independently. + // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 + public bool TryUpdateWindow(int bytes) + { + return _streamLevelFlowControl.TryUpdateWindow(bytes); + } + + public void Abort() + { + _streamLevelFlowControl.Abort(); + + // If this stream is waiting on a connection-level window update, complete this stream's + // connection-level awaitable so the stream abort is observed immediately. + // This could complete an awaitable still sitting in the connection-level awaitable queue, + // but this is safe because completing it again will just no-op. + _currentConnectionLevelAwaitable?.Complete(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static int Clamp(int value, int min, long max) + { + Debug.Assert(min <= max, $"{nameof(Clamp)} called with a min greater than the max."); + + if (value < min) + { + return min; + } + else if (value > max) + { + return (int)max; + } + + return value; + } + } +} diff --git a/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs deleted file mode 100644 index aa7b23330b..0000000000 --- a/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Http; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 -{ - public interface IHttp2FrameWriter - { - void Abort(Exception error); - Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)); - Task Write100ContinueAsync(int streamId); - void WriteResponseHeaders(int streamId, int statusCode, IHeaderDictionary headers); - Task WriteDataAsync(int streamId, ReadOnlySpan data, CancellationToken cancellationToken); - Task WriteDataAsync(int streamId, ReadOnlySpan data, bool endStream, CancellationToken cancellationToken); - Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode); - Task WriteSettingsAckAsync(); - Task WritePingAsync(Http2PingFrameFlags flags, ReadOnlySpan payload); - Task WriteGoAwayAsync(int lastStreamId, Http2ErrorCode errorCode); - } -} diff --git a/src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs b/src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs new file mode 100644 index 0000000000..0caeda11ad --- /dev/null +++ b/src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs @@ -0,0 +1,80 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure +{ + /// + /// This wraps PipeWriter.FlushAsync() in a way that allows multiple awaiters making it safe to call from publicly + /// exposed Stream implementations. + /// + public class StreamSafePipeFlusher + { + private readonly PipeWriter _writer; + private readonly ITimeoutControl _timeoutControl; + private readonly object _flushLock = new object(); + + private Task _lastFlushTask = Task.CompletedTask; + + public StreamSafePipeFlusher( + PipeWriter writer, + ITimeoutControl timeoutControl) + { + _writer = writer; + _timeoutControl = timeoutControl; + } + + public Task FlushAsync(long count = 0, IHttpOutputProducer outputProducer = null, CancellationToken cancellationToken = default) + { + var flushValueTask = _writer.FlushAsync(cancellationToken); + + if (flushValueTask.IsCompletedSuccessfully) + { + return Task.CompletedTask; + } + + // https://github.com/dotnet/corefxlab/issues/1334 + // Pipelines don't support multiple awaiters on flush. + // While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes, + // it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case, + // we find previous flush Task which still accounts for any newly committed bytes and await that. + lock (_flushLock) + { + if (_lastFlushTask.IsCompleted) + { + _lastFlushTask = flushValueTask.AsTask(); + } + + return TimeFlushAsync(count, outputProducer, cancellationToken); + } + } + + private async Task TimeFlushAsync(long count, IHttpOutputProducer outputProducer, CancellationToken cancellationToken) + { + _timeoutControl.StartTimingWrite(count); + + try + { + await _lastFlushTask; + } + catch (OperationCanceledException ex) + { + outputProducer.Abort(new ConnectionAbortedException(CoreStrings.ConnectionOrStreamAbortedByCancellationToken, ex)); + } + catch + { + // A canceled token is the only reason flush should ever throw. + } + + _timeoutControl.StopTimingWrite(); + + cancellationToken.ThrowIfCancellationRequested(); + } + } +} diff --git a/src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs b/src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs new file mode 100644 index 0000000000..1b1e080c97 --- /dev/null +++ b/src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs @@ -0,0 +1,35 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 +{ + public class ThreadPoolAwaitable : ICriticalNotifyCompletion + { + public static ThreadPoolAwaitable Instance = new ThreadPoolAwaitable(); + + private ThreadPoolAwaitable() + { + } + + public ThreadPoolAwaitable GetAwaiter() => this; + public bool IsCompleted => false; + + public void GetResult() + { + } + + public void OnCompleted(Action continuation) + { + ThreadPool.QueueUserWorkItem(state => ((Action)state)(), continuation); + } + + public void UnsafeOnCompleted(Action continuation) + { + OnCompleted(continuation); + } + } +} diff --git a/src/Kestrel.Core/Properties/CoreStrings.Designer.cs b/src/Kestrel.Core/Properties/CoreStrings.Designer.cs index 29d9ac9bb3..6e6f832906 100644 --- a/src/Kestrel.Core/Properties/CoreStrings.Designer.cs +++ b/src/Kestrel.Core/Properties/CoreStrings.Designer.cs @@ -1904,6 +1904,90 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core internal static string FormatHttp2ErrorInvalidPreface() => GetString("Http2ErrorInvalidPreface"); + /// + /// The connection or stream was aborted because a write operation was aborted with a CancellationToken. + /// + internal static string ConnectionOrStreamAbortedByCancellationToken + { + get => GetString("ConnectionOrStreamAbortedByCancellationToken"); + } + + /// + /// The connection or stream was aborted because a write operation was aborted with a CancellationToken. + /// + internal static string FormatConnectionOrStreamAbortedByCancellationToken() + => GetString("ConnectionOrStreamAbortedByCancellationToken"); + + /// + /// The client sent a SETTINGS frame with a SETTINGS_INITIAL_WINDOW_SIZE that caused a flow-control window to exceed the maximum size. + /// + internal static string Http2ErrorInitialWindowSizeInvalid + { + get => GetString("Http2ErrorInitialWindowSizeInvalid"); + } + + /// + /// The client sent a SETTINGS frame with a SETTINGS_INITIAL_WINDOW_SIZE that caused a flow-control window to exceed the maximum size. + /// + internal static string FormatHttp2ErrorInitialWindowSizeInvalid() + => GetString("Http2ErrorInitialWindowSizeInvalid"); + + /// + /// The client sent a WINDOW_UPDATE frame that caused a flow-control window to exceed the maximum size. + /// + internal static string Http2ErrorWindowUpdateSizeInvalid + { + get => GetString("Http2ErrorWindowUpdateSizeInvalid"); + } + + /// + /// The client sent a WINDOW_UPDATE frame that caused a flow-control window to exceed the maximum size. + /// + internal static string FormatHttp2ErrorWindowUpdateSizeInvalid() + => GetString("Http2ErrorWindowUpdateSizeInvalid"); + + /// + /// The HTTP/2 connection faulted. + /// + internal static string Http2ConnectionFaulted + { + get => GetString("Http2ConnectionFaulted"); + } + + /// + /// The HTTP/2 connection faulted. + /// + internal static string FormatHttp2ConnectionFaulted() + => GetString("Http2ConnectionFaulted"); + + /// + /// The client reset the request stream. + /// + internal static string Http2StreamResetByClient + { + get => GetString("Http2StreamResetByClient"); + } + + /// + /// The client reset the request stream. + /// + internal static string FormatHttp2StreamResetByClient() + => GetString("Http2StreamResetByClient"); + + /// + /// The request stream was aborted. + /// + internal static string Http2StreamAborted + { + get => GetString("Http2StreamAborted"); + } + + /// + /// The request stream was aborted. + /// + internal static string FormatHttp2StreamAborted() + => GetString("Http2StreamAborted"); + private static string GetString(string name, params string[] formatterNames) { var value = _resourceManager.GetString(name); diff --git a/test/Kestrel.Core.Tests/Http2ConnectionTests.cs b/test/Kestrel.Core.Tests/Http2ConnectionTests.cs index fae8ffb2fb..e6871565ab 100644 --- a/test/Kestrel.Core.Tests/Http2ConnectionTests.cs +++ b/test/Kestrel.Core.Tests/Http2ConnectionTests.cs @@ -125,6 +125,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests private readonly RequestDelegate _largeHeadersApplication; private readonly RequestDelegate _waitForAbortApplication; private readonly RequestDelegate _waitForAbortFlushingApplication; + private readonly RequestDelegate _waitForAbortWithDataApplication; private Task _connectionTask; @@ -269,6 +270,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests _runningStreams[streamIdFeature.StreamId].TrySetResult(null); }; + _waitForAbortWithDataApplication = async context => + { + var streamIdFeature = context.Features.Get(); + var sem = new SemaphoreSlim(0); + + context.RequestAborted.Register(() => + { + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamIdFeature.StreamId); + } + + sem.Release(); + }); + + await sem.WaitAsync().DefaultTimeout(); + + await context.Response.Body.WriteAsync(new byte[10], 0, 10); + + _runningStreams[streamIdFeature.StreamId].TrySetResult(null); + }; + _hpackDecoder = new HPackDecoder((int)_clientSettings.HeaderTableSize); _logger = new TestApplicationErrorLogger(); @@ -465,6 +488,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests withFlags: (byte)Http2DataFrameFlags.END_STREAM, withStreamId: 3); + // Ensure that Http2FrameWriter._writeLock isn't acquired when completing the frame processing loop. + // Otherwise there's a deadlock where Http2OutputProducer.Abort() being called from the frame processing + // loop blocks waiting Http2OutputProducer.Dispose() being called from the stream processing loop to + // acquire the _writeLock. + await ThreadPoolAwaitable.Instance; + await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); Assert.Equal(stream1DataFrame1.DataPayload, _helloBytes); @@ -536,6 +565,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests withFlags: (byte)Http2DataFrameFlags.END_STREAM, withStreamId: 1); + // Ensure that Http2FrameWriter._writeLock isn't acquired when completing the frame processing loop. + // Otherwise there's a deadlock where Http2OutputProducer.Abort() being called from the frame processing + // loop blocks waiting Http2OutputProducer.Dispose() being called from the stream processing loop to + // acquire the _writeLock. + await ThreadPoolAwaitable.Instance; + await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); } @@ -752,6 +787,122 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests expectedErrorMessage: CoreStrings.FormatHttp2ErrorStreamClosed(Http2FrameType.DATA, streamId: 1)); } + [Fact] + public async Task DATA_Sent_DespiteConnectionBackpressure_IfEmptyAndEndsStream() + { + // Zero-length data frames are allowed to be sent even if there is no space available in the flow control window. + // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 + + var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length; + var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length; + var remainingBytesAfterBackpressure = _maxData.Length - remainingBytesBeforeBackpressure; + + // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test. + _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2; + + await InitializeConnectionAsync(async context => + { + var streamId = context.Features.Get().StreamId; + + try + { + if (streamId == 1) + { + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure + 1; i++) + { + await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length); + } + } + + _runningStreams[streamId].SetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamId].SetException(ex); + throw; + } + }); + + // Start one stream that consumes the entire connection output window. + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await ExpectAsync(Http2FrameType.DATA, + withLength: _maxData.Length, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + } + + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesBeforeBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + // Start one more stream that receives an empty response despite connection backpressure. + await StartStreamAsync(3, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 55, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 3); + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 3); + + // Relieve connection backpressure to receive the rest of the first streams body. + await SendWindowUpdateAsync(0, remainingBytesAfterBackpressure); + + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesAfterBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + // Ensure that Http2FrameWriter._writeLock isn't acquired when completing the frame processing loop. + // Otherwise there's a deadlock where Http2OutputProducer.Abort() being called from the frame processing + // loop blocks waiting Http2OutputProducer.Dispose() being called from the stream processing loop to + // acquire the _writeLock. + await ThreadPoolAwaitable.Instance; + + await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); + await WaitForAllStreamsAsync(); + } + + [Fact] + public async Task DATA_Sent_DespiteStreamBackpressure_IfEmptyAndEndsStream() + { + // Zero-length data frames are allowed to be sent even if there is no space available in the flow control window. + // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 + + // This only affects the stream windows. The connection-level window is always initialized at 64KiB. + _clientSettings.InitialWindowSize = 0; + + await InitializeConnectionAsync(_noopApplication); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 55, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + } + [Fact] public async Task HEADERS_Received_Decoded() { @@ -1197,7 +1348,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, headerBlock); await WaitForStreamErrorAsync( - ignoreNonRstStreamFrames: false, expectedStreamId: 1, expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR, expectedErrorMessage: CoreStrings.Http2ErrorHeaderNameUppercase); @@ -1286,7 +1436,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, headers); await WaitForStreamErrorAsync( - ignoreNonRstStreamFrames: false, expectedStreamId: 1, expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR, expectedErrorMessage: expectedErrorMessage); @@ -1740,33 +1889,351 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests await StartStreamAsync(1, _browserRequestHeaders, endStream: true); await SendRstStreamAsync(1); - - // No data is received from the stream since it was aborted before writing anything - - await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); - await WaitForAllStreamsAsync(); Assert.Contains(1, _abortedStreamIds); + + await SendGoAwayAsync(); + + // No data is received from the stream since it was aborted before writing anything + await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + // TODO: Check logs } [Fact] - public async Task RST_STREAM_Received_AbortsStream_FlushedDataIsSent() + public async Task RST_STREAM_Received_AbortsStream_FlushedHeadersNotSent() { await InitializeConnectionAsync(_waitForAbortFlushingApplication); await StartStreamAsync(1, _browserRequestHeaders, endStream: true); await SendRstStreamAsync(1); + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + + await SendGoAwayAsync(); + + // No END_STREAM HEADERS or DATA frame is received since the stream was aborted + await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + // TODO: Check logs + } + + [Fact] + public async Task RST_STREAM_Received_AbortsStream_FlushedDataNotSent() + { + await InitializeConnectionAsync(_waitForAbortWithDataApplication); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + await SendRstStreamAsync(1); + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + + await SendGoAwayAsync(); + + // No END_STREAM HEADERS or DATA frame is received since the stream was aborted + await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + // TODO: Check logs + } + + [Fact] + public async Task RST_STREAM_Received_RelievesConnectionBackpressure() + { + var writeTasks = new Task[4]; + + var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length; + var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length; + + // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test. + _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2; + + await InitializeConnectionAsync(async context => + { + var streamId = context.Features.Get().StreamId; + + var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + context.RequestAborted.Register(() => + { + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamId); + abortedTcs.SetResult(null); + } + }); + + try + { + writeTasks[streamId] = writeTcs.Task; + + // Flush headers even if the body can't yet be written because of flow control. + await context.Response.Body.FlushAsync(); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length); + } + + await context.Response.Body.WriteAsync(_maxData, 0, remainingBytesBeforeBackpressure + 1); + + writeTcs.SetResult(null); + + await abortedTcs.Task; + + _runningStreams[streamId].SetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamId].SetException(ex); + throw; + } + }); + + // Start one stream that consumes the entire connection output window. + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); await ExpectAsync(Http2FrameType.HEADERS, withLength: 37, withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, withStreamId: 1); - // No END_STREAM DATA frame is received since the stream was aborted + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await ExpectAsync(Http2FrameType.DATA, + withLength: _maxData.Length, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + } - await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesBeforeBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + // Ensure connection-level backpressure was hit. + Assert.False(writeTasks[1].IsCompleted); + + // Start another stream that immediately experiences backpressure. + await StartStreamAsync(3, _browserRequestHeaders, endStream: true); + + // The headers, but not the data for stream 3, can be sent prior to any window updates. + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 3); + + await SendRstStreamAsync(1); + // Any paused writes for stream 1 should complete after an RST_STREAM + // even without any preceeding window updates. + await _runningStreams[1].Task.DefaultTimeout(); + + // A connection-level window update allows the non-reset stream to continue. + await SendWindowUpdateAsync(0, (int)Http2PeerSettings.DefaultInitialWindowSize); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await ExpectAsync(Http2FrameType.DATA, + withLength: _maxData.Length, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 3); + } + + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesBeforeBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 3); + + Assert.False(writeTasks[3].IsCompleted); + + await SendRstStreamAsync(3); + await _runningStreams[3].Task.DefaultTimeout(); + + await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); + + await WaitForAllStreamsAsync(); Assert.Contains(1, _abortedStreamIds); + Assert.Contains(3, _abortedStreamIds); + } + + [Fact] + public async Task RST_STREAM_Received_RelievesStreamBackpressure() + { + var writeTasks = new Task[6]; + var initialWindowSize = _helloWorldBytes.Length / 2; + + // This only affects the stream windows. The connection-level window is always initialized at 64KiB. + _clientSettings.InitialWindowSize = (uint)initialWindowSize; + + await InitializeConnectionAsync(async context => + { + var streamId = context.Features.Get().StreamId; + + var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + context.RequestAborted.Register(() => + { + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamId); + abortedTcs.SetResult(null); + } + }); + + try + { + writeTasks[streamId] = writeTcs.Task; + await context.Response.Body.WriteAsync(_helloWorldBytes, 0, _helloWorldBytes.Length); + writeTcs.SetResult(null); + + await abortedTcs.Task; + + _runningStreams[streamId].SetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamId].SetException(ex); + throw; + } + }); + + async Task VerifyStreamBackpressure(int streamId) + { + await StartStreamAsync(streamId, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: streamId); + + var dataFrame = await ExpectAsync(Http2FrameType.DATA, + withLength: initialWindowSize, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: streamId); + + Assert.Equal(dataFrame.DataPayload, new ArraySegment(_helloWorldBytes, 0, initialWindowSize)); + Assert.False(writeTasks[streamId].IsCompleted); + } + + await VerifyStreamBackpressure(1); + await VerifyStreamBackpressure(3); + await VerifyStreamBackpressure(5); + + await SendRstStreamAsync(1); + await writeTasks[1].DefaultTimeout(); + Assert.False(writeTasks[3].IsCompleted); + Assert.False(writeTasks[5].IsCompleted); + + await SendRstStreamAsync(3); + await writeTasks[3].DefaultTimeout(); + Assert.False(writeTasks[5].IsCompleted); + + await SendRstStreamAsync(5); + await writeTasks[5].DefaultTimeout(); + + await StopConnectionAsync(expectedLastStreamId: 5, ignoreNonGoAwayFrames: false); + + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + Assert.Contains(3, _abortedStreamIds); + Assert.Contains(5, _abortedStreamIds); + } + + [Fact] + public async Task RST_STREAM_WaitingForRequestBody_RequestBodyThrows() + { + var sem = new SemaphoreSlim(0); + await InitializeConnectionAsync(async context => + { + var streamIdFeature = context.Features.Get(); + + try + { + var readTask = context.Request.Body.ReadAsync(new byte[100], 0, 100).DefaultTimeout(); + sem.Release(); + await readTask; + + _runningStreams[streamIdFeature.StreamId].TrySetException(new Exception("ReadAsync was expected to throw.")); + } + catch (IOException) // Expected failure + { + await context.Response.Body.WriteAsync(new byte[10], 0, 10); + + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamIdFeature.StreamId); + } + + _runningStreams[streamIdFeature.StreamId].TrySetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamIdFeature.StreamId].TrySetException(ex); + } + }); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: false); + await sem.WaitAsync().DefaultTimeout(); + await SendRstStreamAsync(1); + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + + await SendGoAwayAsync(); + + // No data is received from the stream since it was aborted before writing anything + await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + // TODO: Check logs + } + + [Fact] + public async Task RST_STREAM_IncompleteRequest_RequestBodyThrows() + { + var sem = new SemaphoreSlim(0); + await InitializeConnectionAsync(async context => + { + var streamIdFeature = context.Features.Get(); + + try + { + var read = await context.Request.Body.ReadAsync(new byte[100], 0, 100).DefaultTimeout(); + var readTask = context.Request.Body.ReadAsync(new byte[100], 0, 100).DefaultTimeout(); + sem.Release(); + await readTask; + + _runningStreams[streamIdFeature.StreamId].TrySetException(new Exception("ReadAsync was expected to throw.")); + } + catch (IOException) // Expected failure + { + await context.Response.Body.WriteAsync(new byte[10], 0, 10); + + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamIdFeature.StreamId); + } + + _runningStreams[streamIdFeature.StreamId].TrySetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamIdFeature.StreamId].TrySetException(ex); + } + }); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: false); + await SendDataAsync(1, new byte[10], endStream: false); + await sem.WaitAsync().DefaultTimeout(); + await SendRstStreamAsync(1); + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + + await SendGoAwayAsync(); + + // No data is received from the stream since it was aborted before writing anything + await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + // TODO: Check logs } [Fact] @@ -1940,6 +2407,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests expectedErrorMessage: CoreStrings.Http2ErrorSettingsLengthNotMultipleOfSix); } + [Fact] + public async Task SETTINGS_Received_WithInitialWindowSizePushingStreamWindowOverMax_ConnectionError() + { + await InitializeConnectionAsync(_waitForAbortApplication); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: false); + + await SendWindowUpdateAsync(1, (int)(Http2PeerSettings.MaxWindowSize - _clientSettings.InitialWindowSize)); + + _clientSettings.InitialWindowSize += 1; + await SendSettingsAsync(); + + await WaitForConnectionErrorAsync( + ignoreNonGoAwayFrames: false, + expectedLastStreamId: 1, + expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR, + expectedErrorMessage: CoreStrings.Http2ErrorInitialWindowSizeInvalid); + } + [Fact] public async Task PUSH_PROMISE_Received_ConnectionError() { @@ -2055,6 +2541,180 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests Assert.Contains(5, _abortedStreamIds); } + [Fact] + public async Task GOAWAY_Received_RelievesConnectionBackpressure() + { + var writeTasks = new Task[6]; + var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length; + var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length; + + // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test. + _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2; + + await InitializeConnectionAsync(async context => + { + var streamId = context.Features.Get().StreamId; + + var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + context.RequestAborted.Register(() => + { + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamId); + abortedTcs.SetResult(null); + } + }); + + try + { + writeTasks[streamId] = writeTcs.Task; + + // Flush headers even if the body can't yet be written because of flow control. + await context.Response.Body.FlushAsync(); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length); + } + + await context.Response.Body.WriteAsync(_maxData, 0, remainingBytesBeforeBackpressure + 1); + + writeTcs.SetResult(null); + + await abortedTcs.Task; + + _runningStreams[streamId].SetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamId].SetException(ex); + throw; + } + }); + + // Start one stream that consumes the entire connection output window. + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await ExpectAsync(Http2FrameType.DATA, + withLength: _maxData.Length, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + } + + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesBeforeBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + Assert.False(writeTasks[1].IsCompleted); + + // Start two more streams that immediately experience backpressure. + // The headers, but not the data for the stream, can still be sent. + await StartStreamAsync(3, _browserRequestHeaders, endStream: true); + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 3); + + await StartStreamAsync(5, _browserRequestHeaders, endStream: true); + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 5); + + await SendGoAwayAsync(); + + await WaitForConnectionStopAsync(expectedLastStreamId: 5, ignoreNonGoAwayFrames: false); + + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + Assert.Contains(3, _abortedStreamIds); + Assert.Contains(5, _abortedStreamIds); + } + + [Fact] + public async Task GOAWAY_Received_RelievesStreamBackpressure() + { + var writeTasks = new Task[6]; + var initialWindowSize = _helloWorldBytes.Length / 2; + + // This only affects the stream windows. The connection-level window is always initialized at 64KiB. + _clientSettings.InitialWindowSize = (uint)initialWindowSize; + + await InitializeConnectionAsync(async context => + { + var streamId = context.Features.Get().StreamId; + + var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + context.RequestAborted.Register(() => + { + lock (_abortedStreamIdsLock) + { + _abortedStreamIds.Add(streamId); + abortedTcs.SetResult(null); + } + }); + + try + { + writeTasks[streamId] = writeTcs.Task; + await context.Response.Body.WriteAsync(_helloWorldBytes, 0, _helloWorldBytes.Length); + writeTcs.SetResult(null); + + await abortedTcs.Task; + + _runningStreams[streamId].SetResult(null); + } + catch (Exception ex) + { + _runningStreams[streamId].SetException(ex); + throw; + } + }); + + async Task VerifyStreamBackpressure(int streamId) + { + await StartStreamAsync(streamId, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: streamId); + + var dataFrame = await ExpectAsync(Http2FrameType.DATA, + withLength: initialWindowSize, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: streamId); + + Assert.Equal(dataFrame.DataPayload, new ArraySegment(_helloWorldBytes, 0, initialWindowSize)); + Assert.False(writeTasks[streamId].IsCompleted); + } + + await VerifyStreamBackpressure(1); + await VerifyStreamBackpressure(3); + await VerifyStreamBackpressure(5); + + await SendGoAwayAsync(); + + await WaitForConnectionStopAsync(expectedLastStreamId: 5, ignoreNonGoAwayFrames: false); + + await WaitForAllStreamsAsync(); + Assert.Contains(1, _abortedStreamIds); + Assert.Contains(3, _abortedStreamIds); + Assert.Contains(5, _abortedStreamIds); + } + [Fact] public async Task GOAWAY_Received_StreamIdNotZero_ConnectionError() { @@ -2174,6 +2834,230 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests expectedErrorMessage: CoreStrings.FormatHttp2ErrorStreamIdle(Http2FrameType.WINDOW_UPDATE, streamId: 1)); } + [Fact] + public async Task WINDOW_UPDATE_Received_OnConnection_IncreasesWindowAboveMaxValue_ConnectionError() + { + var maxIncrement = (int)(Http2PeerSettings.MaxWindowSize - Http2PeerSettings.DefaultInitialWindowSize); + + await InitializeConnectionAsync(_noopApplication); + + await SendWindowUpdateAsync(0, sizeIncrement: maxIncrement); + await SendWindowUpdateAsync(0, sizeIncrement: 1); + + await WaitForConnectionErrorAsync( + ignoreNonGoAwayFrames: false, + expectedLastStreamId: 0, + expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR, + expectedErrorMessage: CoreStrings.Http2ErrorWindowUpdateSizeInvalid); + } + + [Fact] + public async Task WINDOW_UPDATE_Received_OnStream_IncreasesWindowAboveMaxValue_StreamError() + { + var maxIncrement = (int)(Http2PeerSettings.MaxWindowSize - Http2PeerSettings.DefaultInitialWindowSize); + + await InitializeConnectionAsync(_waitForAbortApplication); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + await SendWindowUpdateAsync(1, sizeIncrement: maxIncrement); + await SendWindowUpdateAsync(1, sizeIncrement: 1); + + await WaitForStreamErrorAsync( + expectedStreamId: 1, + expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR, + expectedErrorMessage: CoreStrings.Http2ErrorWindowUpdateSizeInvalid); + + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + } + + [Fact] + public async Task WINDOW_UPDATE_Received_OnConnection_Respected() + { + var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length; + + // Use this semaphore to wait until a new data frame is expected before trying to send it. + // This way we're sure that if Response.Body.WriteAsync returns an incomplete task, it's because + // of the flow control window and not Pipe backpressure. + var expectingDataSem = new SemaphoreSlim(0); + var backpressureObservedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var backpressureReleasedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test. + _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2; + + await InitializeConnectionAsync(async context => + { + try + { + // Flush the headers so expectingDataSem is released. + await context.Response.Body.FlushAsync(); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + await expectingDataSem.WaitAsync(); + Assert.True(context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length).IsCompleted); + } + + await expectingDataSem.WaitAsync(); + var lastWriteTask = context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length); + + Assert.False(lastWriteTask.IsCompleted); + backpressureObservedTcs.TrySetResult(null); + + await lastWriteTask; + backpressureReleasedTcs.TrySetResult(null); + } + catch (Exception ex) + { + backpressureObservedTcs.TrySetException(ex); + backpressureReleasedTcs.TrySetException(ex); + throw; + } + }); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + + for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) + { + expectingDataSem.Release(); + await ExpectAsync(Http2FrameType.DATA, + withLength: _maxData.Length, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + } + + var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length; + var remainingBytesAfterBackpressure = _maxData.Length - remainingBytesBeforeBackpressure; + + expectingDataSem.Release(); + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesBeforeBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await backpressureObservedTcs.Task.DefaultTimeout(); + + await SendWindowUpdateAsync(0, remainingBytesAfterBackpressure); + + await backpressureReleasedTcs.Task.DefaultTimeout(); + + // This is the remaining data that could have come in the last frame if not for the flow control window, + // so there's no need to release the semaphore again. + await ExpectAsync(Http2FrameType.DATA, + withLength: remainingBytesAfterBackpressure, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + } + + [Fact] + public async Task WINDOW_UPDATE_Received_OnStream_Respected() + { + var initialWindowSize = _helloWorldBytes.Length / 2; + + // This only affects the stream windows. The connection-level window is always initialized at 64KiB. + _clientSettings.InitialWindowSize = (uint)initialWindowSize; + + await InitializeConnectionAsync(_echoApplication); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: false); + await SendDataAsync(1, _helloWorldBytes, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + + var dataFrame1 = await ExpectAsync(Http2FrameType.DATA, + withLength: initialWindowSize, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(1, initialWindowSize); + + var dataFrame2 = await ExpectAsync(Http2FrameType.DATA, + withLength: initialWindowSize, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + Assert.Equal(dataFrame1.DataPayload, new ArraySegment(_helloWorldBytes, 0, initialWindowSize)); + Assert.Equal(dataFrame2.DataPayload, new ArraySegment(_helloWorldBytes, initialWindowSize, initialWindowSize)); + } + + [Fact] + public async Task WINDOW_UPDATE_Received_OnStream_Respected_WhenInitialWindowSizeReducedMidStream() + { + // This only affects the stream windows. The connection-level window is always initialized at 64KiB. + _clientSettings.InitialWindowSize = 6; + + await InitializeConnectionAsync(_echoApplication); + + await StartStreamAsync(1, _browserRequestHeaders, endStream: false); + await SendDataAsync(1, _helloWorldBytes, endStream: true); + + await ExpectAsync(Http2FrameType.HEADERS, + withLength: 37, + withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, + withStreamId: 1); + + var dataFrame1 = await ExpectAsync(Http2FrameType.DATA, + withLength: 6, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + // Reduce the initial window size for response data by 3 bytes. + _clientSettings.InitialWindowSize = 3; + await SendSettingsAsync(); + + await ExpectAsync(Http2FrameType.SETTINGS, + withLength: 0, + withFlags: (byte)Http2SettingsFrameFlags.ACK, + withStreamId: 0); + + await SendWindowUpdateAsync(1, 6); + + var dataFrame2 = await ExpectAsync(Http2FrameType.DATA, + withLength: 3, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await SendWindowUpdateAsync(1, 3); + + var dataFrame3 = await ExpectAsync(Http2FrameType.DATA, + withLength: 3, + withFlags: (byte)Http2DataFrameFlags.NONE, + withStreamId: 1); + + await ExpectAsync(Http2FrameType.DATA, + withLength: 0, + withFlags: (byte)Http2DataFrameFlags.END_STREAM, + withStreamId: 1); + + await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); + + Assert.Equal(dataFrame1.DataPayload, new ArraySegment(_helloWorldBytes, 0, 6)); + Assert.Equal(dataFrame2.DataPayload, new ArraySegment(_helloWorldBytes, 6, 3)); + Assert.Equal(dataFrame3.DataPayload, new ArraySegment(_helloWorldBytes, 9, 3)); + } + [Fact] public async Task CONTINUATION_Received_Decoded() { @@ -2313,7 +3197,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests await SendEmptyContinuationFrameAsync(1, Http2ContinuationFrameFlags.END_HEADERS); await WaitForStreamErrorAsync( - ignoreNonRstStreamFrames: false, expectedStreamId: 1, expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR, expectedErrorMessage: CoreStrings.Http2ErrorMissingMandatoryPseudoHeaderFields); @@ -3028,18 +3911,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests _pair.Application.Output.Complete(); } - private async Task WaitForStreamErrorAsync(bool ignoreNonRstStreamFrames, int expectedStreamId, Http2ErrorCode expectedErrorCode, string expectedErrorMessage) + private async Task WaitForStreamErrorAsync(int expectedStreamId, Http2ErrorCode expectedErrorCode, string expectedErrorMessage) { var frame = await ReceiveFrameAsync(); - if (ignoreNonRstStreamFrames) - { - while (frame.Type != Http2FrameType.RST_STREAM) - { - frame = await ReceiveFrameAsync(); - } - } - Assert.Equal(Http2FrameType.RST_STREAM, frame.Type); Assert.Equal(4, frame.Length); Assert.Equal(0, frame.Flags); diff --git a/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs b/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs index ca916cc4ae..9e2db9ba26 100644 --- a/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs +++ b/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs @@ -54,8 +54,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.Http2 get { var dataset = new TheoryData(); - var toSkip = new[] { "hpack/4.2/1", "http2/5.1/8", "http2/6.9.1/2", "http2/6.9.1/3", "http2/8.1.2.3/1", - "http2/8.1.2.6/1", "http2/8.1.2.6/2" }; + var toSkip = new[] { "hpack/4.2/1", "http2/5.1/8", "http2/8.1.2.3/1", "http2/8.1.2.6/1", "http2/8.1.2.6/2" }; foreach (var testcase in H2SpecCommands.EnumerateTestCases()) {