diff --git a/src/Kestrel.Core/CoreStrings.resx b/src/Kestrel.Core/CoreStrings.resx
index 0530d599a8..6b9ca5514d 100644
--- a/src/Kestrel.Core/CoreStrings.resx
+++ b/src/Kestrel.Core/CoreStrings.resx
@@ -524,4 +524,22 @@ For more information on configuring HTTPS see https://go.microsoft.com/fwlink/?l
Invalid HTTP/2 connection preface.
+
+ The connection or stream was aborted because a write operation was aborted with a CancellationToken.
+
+
+ The client sent a SETTINGS frame with a SETTINGS_INITIAL_WINDOW_SIZE that caused a flow-control window to exceed the maximum size.
+
+
+ The client sent a WINDOW_UPDATE frame that caused a flow-control window to exceed the maximum size.
+
+
+ The HTTP/2 connection faulted.
+
+
+ The client reset the request stream.
+
+
+ The request stream was aborted.
+
\ No newline at end of file
diff --git a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs
index 0fd62301bd..ce04650af2 100644
--- a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs
+++ b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs
@@ -4,12 +4,10 @@
using System;
using System.Buffers;
using System.IO.Pipelines;
-using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
-using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
@@ -27,6 +25,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
private readonly ITimeoutControl _timeoutControl;
private readonly IKestrelTrace _log;
private readonly IBytesWrittenFeature _transportBytesWrittenFeature;
+ private readonly StreamSafePipeFlusher _flusher;
// This locks access to to all of the below fields
private readonly object _contextLock = new object();
@@ -37,16 +36,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
private long _totalBytesCommitted;
private readonly PipeWriter _pipeWriter;
-
- // https://github.com/dotnet/corefxlab/issues/1334
- // Pipelines don't support multiple awaiters on flush
- // this is temporary until it does
- private TaskCompletionSource _flushTcs;
- private readonly object _flushLock = new object();
- private Action _flushCompleted;
-
- private ValueTask _flushTask;
-
public Http1OutputProducer(
PipeWriter pipeWriter,
string connectionId,
@@ -60,11 +49,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_connectionContext = connectionContext;
_timeoutControl = timeoutControl;
_log = log;
- _flushCompleted = OnFlushCompleted;
_transportBytesWrittenFeature = transportBytesWrittenFeature;
+ _flusher = new StreamSafePipeFlusher(pipeWriter, timeoutControl);
}
- public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default(CancellationToken))
+ public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -74,12 +63,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return WriteAsync(buffer, cancellationToken);
}
- public Task WriteStreamSuffixAsync(CancellationToken cancellationToken)
+ public Task WriteStreamSuffixAsync()
{
- return WriteAsync(_endChunkedResponseBytes.Span, cancellationToken);
+ return WriteAsync(_endChunkedResponseBytes.Span);
}
- public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public Task FlushAsync(CancellationToken cancellationToken = default)
{
return WriteAsync(Constants.EmptyData, cancellationToken);
}
@@ -191,17 +180,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
}
}
- public Task Write100ContinueAsync(CancellationToken cancellationToken)
+ public Task Write100ContinueAsync()
{
- return WriteAsync(_continueBytes.Span, default(CancellationToken));
+ return WriteAsync(_continueBytes.Span);
}
private Task WriteAsync(
ReadOnlySpan buffer,
- CancellationToken cancellationToken)
+ CancellationToken cancellationToken = default)
{
- var writableBuffer = default(PipeWriter);
- long bytesWritten = 0;
lock (_contextLock)
{
if (_completed)
@@ -209,8 +196,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return Task.CompletedTask;
}
- writableBuffer = _pipeWriter;
- var writer = new CountingBufferWriter(writableBuffer);
+ var writer = new CountingBufferWriter(_pipeWriter);
if (buffer.Length > 0)
{
writer.Write(buffer);
@@ -220,74 +206,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
}
writer.Commit();
- bytesWritten = _unflushedBytes;
+ var bytesWritten = _unflushedBytes;
_unflushedBytes = 0;
- }
- return FlushAsync(writableBuffer, bytesWritten, cancellationToken);
- }
-
- // Single caller, at end of method - so inline
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private Task FlushAsync(PipeWriter writableBuffer, long bytesWritten, CancellationToken cancellationToken)
- {
- var awaitable = writableBuffer.FlushAsync(cancellationToken);
- if (awaitable.IsCompleted)
- {
- // The flush task can't fail today
- return Task.CompletedTask;
- }
- return FlushAsyncAwaited(awaitable, bytesWritten, cancellationToken);
- }
-
- private async Task FlushAsyncAwaited(ValueTask awaitable, long count, CancellationToken cancellationToken)
- {
- // https://github.com/dotnet/corefxlab/issues/1334
- // Since the flush awaitable doesn't currently support multiple awaiters
- // we need to use a task to track the callbacks.
- // All awaiters get the same task
- lock (_flushLock)
- {
- _flushTask = awaitable;
- if (_flushTcs == null || _flushTcs.Task.IsCompleted)
- {
- _flushTcs = new TaskCompletionSource();
-
- _flushTask.GetAwaiter().OnCompleted(_flushCompleted);
- }
- }
-
- _timeoutControl.StartTimingWrite(count);
- try
- {
- await _flushTcs.Task;
- cancellationToken.ThrowIfCancellationRequested();
- }
- catch (OperationCanceledException)
- {
- _completed = true;
- throw;
- }
- finally
- {
- _timeoutControl.StopTimingWrite();
- }
- }
-
- private void OnFlushCompleted()
- {
- try
- {
- _flushTask.GetAwaiter().GetResult();
- _flushTcs.TrySetResult(null);
- }
- catch (Exception exception)
- {
- _flushTcs.TrySetResult(exception);
- }
- finally
- {
- _flushTask = default;
+ return _flusher.FlushAsync(bytesWritten, this, cancellationToken);
}
}
}
diff --git a/src/Kestrel.Core/Internal/Http/HttpProtocol.cs b/src/Kestrel.Core/Internal/Http/HttpProtocol.cs
index 0c759c4451..e572f8b2c0 100644
--- a/src/Kestrel.Core/Internal/Http/HttpProtocol.cs
+++ b/src/Kestrel.Core/Internal/Http/HttpProtocol.cs
@@ -429,7 +429,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
///
/// Immediately kill the connection and poison the request and response streams with an error if there is one.
///
- public void Abort(ConnectionAbortedException abortReason)
+ public virtual void Abort(ConnectionAbortedException abortReason)
{
if (Interlocked.Exchange(ref _requestAborted, 1) != 0)
{
@@ -966,7 +966,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
RequestHeaders.TryGetValue("Expect", out var expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
- Output.Write100ContinueAsync(default(CancellationToken)).GetAwaiter().GetResult();
+ Output.Write100ContinueAsync().GetAwaiter().GetResult();
}
}
@@ -1097,7 +1097,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
// For the same reason we call CheckLastWrite() in Content-Length responses.
_abortedCts = null;
- await Output.WriteStreamSuffixAsync(default(CancellationToken));
+ await Output.WriteStreamSuffixAsync();
if (_keepAlive)
{
diff --git a/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs b/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs
index 41dfdbbbec..6dbdaca7f4 100644
--- a/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs
+++ b/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs
@@ -14,12 +14,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
void Abort(ConnectionAbortedException abortReason);
Task WriteAsync(Func callback, T state);
Task FlushAsync(CancellationToken cancellationToken);
- Task Write100ContinueAsync(CancellationToken cancellationToken);
+ Task Write100ContinueAsync();
void WriteResponseHeaders(int statusCode, string ReasonPhrase, HttpResponseHeaders responseHeaders);
// The reason this is ReadOnlySpan and not ReadOnlyMemory is because writes are always
// synchronous. Flushing to get back pressure is the only time we truly go async but
// that's after the buffer is copied
Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken);
- Task WriteStreamSuffixAsync(CancellationToken cancellationToken);
+ Task WriteStreamSuffixAsync();
}
}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2Connection.cs b/src/Kestrel.Core/Internal/Http2/Http2Connection.cs
index 053388fb53..15c47b381c 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2Connection.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2Connection.cs
@@ -61,6 +61,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly Http2ConnectionContext _context;
private readonly Http2FrameWriter _frameWriter;
private readonly HPackDecoder _hpackDecoder;
+ private readonly Http2OutputFlowControl _outputFlowControl = new Http2OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize);
private readonly Http2PeerSettings _serverSettings = new Http2PeerSettings();
private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings();
@@ -80,7 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public Http2Connection(Http2ConnectionContext context)
{
_context = context;
- _frameWriter = new Http2FrameWriter(context.Transport.Output, context.Application.Input);
+ _frameWriter = new Http2FrameWriter(context.Transport.Output, context.Application.Input, _outputFlowControl, this);
_hpackDecoder = new HPackDecoder((int)_serverSettings.HeaderTableSize);
}
@@ -95,7 +96,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public void OnInputOrOutputCompleted()
{
_stopping = true;
- _frameWriter.Abort(ex: null);
+ _frameWriter.Complete();
}
public void Abort(ConnectionAbortedException ex)
@@ -143,7 +144,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
if (Http2FrameReader.ReadFrame(readableBuffer, _incomingFrame, _serverSettings.MaxFrameSize, out consumed, out examined))
{
Log.LogTrace($"Connection id {ConnectionId} received {_incomingFrame.Type} frame with flags 0x{_incomingFrame.Flags:x} and length {_incomingFrame.Length} for stream ID {_incomingFrame.StreamId}");
- await ProcessFrameAsync(application);
+ await ProcessFrameAsync(application);
}
}
else if (result.IsCompleted)
@@ -151,6 +152,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
return;
}
}
+ catch (Http2StreamErrorException ex)
+ {
+ Log.Http2StreamError(ConnectionId, ex);
+ AbortStream(_incomingFrame.StreamId, new ConnectionAbortedException(ex.Message, ex));
+ await _frameWriter.WriteRstStreamAsync(ex.StreamId, ex.ErrorCode);
+ }
finally
{
Input.AdvanceTo(consumed, examined);
@@ -187,20 +194,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
finally
{
+ var connectionError = error as ConnectionAbortedException
+ ?? new ConnectionAbortedException(CoreStrings.Http2ConnectionFaulted, error);
+
try
{
foreach (var stream in _streams.Values)
{
- stream.Http2Abort(error);
+ stream.Abort(connectionError);
}
await _frameWriter.WriteGoAwayAsync(_highestOpenedStreamId, errorCode);
_frameWriter.Complete();
}
+ catch
+ {
+ _frameWriter.Abort(connectionError);
+ throw;
+ }
finally
{
Input.Complete();
- _frameWriter.Abort(ex: null);
}
}
}
@@ -296,7 +310,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
case Http2FrameType.DATA:
return ProcessDataFrameAsync();
case Http2FrameType.HEADERS:
- return ProcessHeadersFrameAsync(application);
+ return ProcessHeadersFrameAsync(application);
case Http2FrameType.PRIORITY:
return ProcessPriorityFrameAsync();
case Http2FrameType.RST_STREAM:
@@ -312,7 +326,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
case Http2FrameType.WINDOW_UPDATE:
return ProcessWindowUpdateFrameAsync();
case Http2FrameType.CONTINUATION:
- return ProcessContinuationFrameAsync(application);
+ return ProcessContinuationFrameAsync(application);
default:
return ProcessUnknownFrameAsync();
}
@@ -442,7 +456,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
LocalEndPoint = _context.LocalEndPoint,
RemoteEndPoint = _context.RemoteEndPoint,
StreamLifetimeHandler = this,
- FrameWriter = _frameWriter
+ ClientPeerSettings = _clientSettings,
+ FrameWriter = _frameWriter,
+ ConnectionOutputFlowControl = _outputFlowControl,
+ TimeoutControl = this,
});
if ((_incomingFrame.HeadersFlags & Http2HeadersFrameFlags.END_STREAM) == Http2HeadersFrameFlags.END_STREAM)
@@ -500,11 +517,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
ThrowIfIncomingFrameSentToIdleStream();
-
- if (_streams.TryGetValue(_incomingFrame.StreamId, out var stream))
- {
- stream.Abort(abortReason: null);
- }
+ AbortStream(_incomingFrame.StreamId, new ConnectionAbortedException(CoreStrings.Http2StreamResetByClient));
return Task.CompletedTask;
}
@@ -533,7 +546,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
try
{
+ // ParseFrame will not parse an InitialWindowSize > int.MaxValue.
+ var previousInitialWindowSize = (int)_clientSettings.InitialWindowSize;
+
_clientSettings.ParseFrame(_incomingFrame);
+
+ // This difference can be negative.
+ var windowSizeDifference = (int)_clientSettings.InitialWindowSize - previousInitialWindowSize;
+
+ if (windowSizeDifference != 0)
+ {
+ foreach (var stream in _streams.Values)
+ {
+ if (!stream.TryUpdateOutputWindow(windowSizeDifference))
+ {
+ // This means that this caused a stream window to become larger than int.MaxValue.
+ // This can never happen with a well behaved client and MUST be treated as a connection error.
+ // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2
+ throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorInitialWindowSizeInvalid, Http2ErrorCode.FLOW_CONTROL_ERROR);
+ }
+ }
+ }
+
return _frameWriter.WriteSettingsAckAsync();
}
catch (Http2SettingsParameterOutOfRangeException ex)
@@ -620,6 +654,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorWindowUpdateIncrementZero, Http2ErrorCode.PROTOCOL_ERROR);
}
+ if (_incomingFrame.StreamId == 0)
+ {
+ if (!_frameWriter.TryUpdateConnectionWindow(_incomingFrame.WindowUpdateSizeIncrement))
+ {
+ throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorWindowUpdateSizeInvalid, Http2ErrorCode.FLOW_CONTROL_ERROR);
+ }
+ }
+ else if (_streams.TryGetValue(_incomingFrame.StreamId, out var stream))
+ {
+ if (!stream.TryUpdateOutputWindow(_incomingFrame.WindowUpdateSizeIncrement))
+ {
+ throw new Http2StreamErrorException(_incomingFrame.StreamId, CoreStrings.Http2ErrorWindowUpdateSizeInvalid, Http2ErrorCode.FLOW_CONTROL_ERROR);
+ }
+ }
+ else
+ {
+ // The stream was not found in the dictionary which means the stream was probably closed. This can
+ // happen when the client sends a window update for a stream right as the server closes the same stream
+ // Since this is an unavoidable race, we just ignore the window update frame.
+ }
+
return Task.CompletedTask;
}
@@ -669,11 +724,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
ResetRequestHeaderParsingState();
}
}
- catch (Http2StreamErrorException ex)
+ catch (Http2StreamErrorException)
{
- Log.Http2StreamError(ConnectionId, ex);
ResetRequestHeaderParsingState();
- return _frameWriter.WriteRstStreamAsync(ex.StreamId, ex.ErrorCode);
+ throw;
}
return Task.CompletedTask;
@@ -745,6 +799,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
+ private void AbortStream(int streamId, ConnectionAbortedException error)
+ {
+ if (_streams.TryGetValue(streamId, out var stream))
+ {
+ stream.Abort(error);
+ }
+ }
+
void IHttp2StreamLifetimeHandler.OnStreamCompleted(int streamId)
{
_streams.TryRemove(streamId, out _);
diff --git a/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs b/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs
index 6958b376f5..deb06ac02a 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2Frame.WindowUpdate.cs
@@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
public int WindowUpdateSizeIncrement
{
- get => ((Payload[0] << 24) | (Payload[1] << 16) | (Payload[2] << 16) | Payload[3]) & 0x7fffffff;
+ get => ((Payload[0] << 24) | (Payload[1] << 16) | (Payload[2] << 8) | Payload[3]) & 0x7fffffff;
set
{
Payload[0] = (byte)(((uint)value >> 24) & 0x7f);
diff --git a/src/Kestrel.Core/Internal/Http2/Http2Frame.cs b/src/Kestrel.Core/Internal/Http2/Http2Frame.cs
index 0d693c0ced..00e2968de1 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2Frame.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2Frame.cs
@@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
public partial class Http2Frame
{
- public const int MinAllowedMaxFrameSize = 16 * 1024;
+ public const int MinAllowedMaxFrameSize = 16 * 1024;
public const int MaxAllowedMaxFrameSize = 16 * 1024 * 1024 - 1;
public const int HeaderLength = 9;
@@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
get => _data[FlagsOffset];
set
{
- _data[FlagsOffset] = (byte)value;
+ _data[FlagsOffset] = value;
}
}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs
index df20e02638..f010264d78 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs
@@ -7,13 +7,15 @@ using System.Collections.Generic;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
- public class Http2FrameWriter : IHttp2FrameWriter
+ public class Http2FrameWriter
{
// Literal Header Field without Indexing - Indexed Name (Index 8 - :status)
private static readonly byte[] _continueBytes = new byte[] { 0x08, 0x03, (byte)'1', (byte)'0', (byte)'0' };
@@ -23,13 +25,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
private readonly PipeWriter _outputWriter;
private readonly PipeReader _outputReader;
+ private readonly Http2OutputFlowControl _connectionOutputFlowControl;
+ private readonly StreamSafePipeFlusher _flusher;
private bool _completed;
- public Http2FrameWriter(PipeWriter outputPipeWriter, PipeReader outputPipeReader)
+ public Http2FrameWriter(
+ PipeWriter outputPipeWriter,
+ PipeReader outputPipeReader,
+ Http2OutputFlowControl connectionOutputFlowControl,
+ ITimeoutControl timeoutControl)
{
_outputWriter = outputPipeWriter;
_outputReader = outputPipeReader;
+
+ _connectionOutputFlowControl = connectionOutputFlowControl;
+ _flusher = new StreamSafePipeFlusher(_outputWriter, timeoutControl);
}
public void Complete()
@@ -42,30 +53,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
_completed = true;
+ _connectionOutputFlowControl.Abort();
_outputWriter.Complete();
}
}
- public void Abort(Exception ex)
+ public void Abort(ConnectionAbortedException ex)
+ {
+ // TODO: Really abort the connection using the ConnectionContex like Http1OutputProducer.
+ _outputReader.CancelPendingRead();
+ Complete();
+ }
+
+ public Task FlushAsync(IHttpOutputProducer outputProducer, CancellationToken cancellationToken)
{
lock (_writeLock)
{
if (_completed)
{
- return;
+ return Task.CompletedTask;
}
- _completed = true;
- _outputReader.CancelPendingRead();
- _outputWriter.Complete(ex);
- }
- }
-
- public Task FlushAsync(CancellationToken cancellationToken)
- {
- lock (_writeLock)
- {
- return WriteAsync(Constants.EmptyData);
+ return _flusher.FlushAsync(0, outputProducer, cancellationToken);
}
}
@@ -77,7 +86,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_outgoingFrame.Length = _continueBytes.Length;
_continueBytes.CopyTo(_outgoingFrame.HeadersPayload);
- return WriteAsync(_outgoingFrame.Raw);
+ return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
}
}
@@ -85,6 +94,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
lock (_writeLock)
{
+ if (_completed)
+ {
+ return;
+ }
+
_outgoingFrame.PrepareHeaders(Http2HeadersFrameFlags.NONE, streamId);
var done = _hpackEncoder.BeginEncode(statusCode, EnumerateHeaders(headers), _outgoingFrame.Payload, out var payloadLength);
@@ -95,7 +109,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_outgoingFrame.HeadersFlags = Http2HeadersFrameFlags.END_HEADERS;
}
- Append(_outgoingFrame.Raw);
+ _outputWriter.Write(_outgoingFrame.Raw);
while (!done)
{
@@ -109,51 +123,128 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_outgoingFrame.ContinuationFlags = Http2ContinuationFrameFlags.END_HEADERS;
}
- Append(_outgoingFrame.Raw);
+ _outputWriter.Write(_outgoingFrame.Raw);
}
}
}
- public Task WriteDataAsync(int streamId, ReadOnlySpan data, CancellationToken cancellationToken)
- => WriteDataAsync(streamId, data, endStream: false, cancellationToken: cancellationToken);
-
- public Task WriteDataAsync(int streamId, ReadOnlySpan data, bool endStream, CancellationToken cancellationToken)
+ public Task WriteDataAsync(int streamId, Http2StreamOutputFlowControl flowControl, ReadOnlySequence data, bool endStream)
{
- var tasks = new List();
+ // The Length property of a ReadOnlySequence can be expensive, so we cache the value.
+ var dataLength = data.Length;
lock (_writeLock)
{
- _outgoingFrame.PrepareData(streamId);
-
- while (data.Length > _outgoingFrame.Length)
+ if (_completed || flowControl.IsAborted)
{
- data.Slice(0, _outgoingFrame.Length).CopyTo(_outgoingFrame.Payload);
- data = data.Slice(_outgoingFrame.Length);
-
- tasks.Add(WriteAsync(_outgoingFrame.Raw, cancellationToken));
+ return Task.CompletedTask;
}
- _outgoingFrame.Length = data.Length;
-
- if (endStream)
+ // Zero-length data frames are allowed to be sent immediately even if there is no space available in the flow control window.
+ // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
+ if (dataLength != 0 && dataLength > flowControl.Available)
{
- _outgoingFrame.DataFlags = Http2DataFrameFlags.END_STREAM;
+ return WriteDataAsyncAwaited(streamId, flowControl, data, dataLength, endStream);
}
- data.CopyTo(_outgoingFrame.Payload);
-
- tasks.Add(WriteAsync(_outgoingFrame.Raw, cancellationToken));
-
- return Task.WhenAll(tasks);
+ // This cast is safe since if dataLength would overflow an int, it's guaranteed to be greater than the available flow control window.
+ flowControl.Advance((int)dataLength);
+ return WriteDataUnsynchronizedAsync(streamId, data, endStream);
}
}
+ private Task WriteDataUnsynchronizedAsync(int streamId, ReadOnlySequence data, bool endStream)
+ {
+ _outgoingFrame.PrepareData(streamId);
+
+ var payload = _outgoingFrame.Payload;
+ var unwrittenPayloadLength = 0;
+
+ foreach (var buffer in data)
+ {
+ var current = buffer;
+
+ while (current.Length > payload.Length)
+ {
+ current.Span.Slice(0, payload.Length).CopyTo(payload);
+ current = current.Slice(payload.Length);
+
+ _outputWriter.Write(_outgoingFrame.Raw);
+ payload = _outgoingFrame.Payload;
+ unwrittenPayloadLength = 0;
+ }
+
+ if (current.Length > 0)
+ {
+ current.Span.CopyTo(payload);
+ payload = payload.Slice(current.Length);
+ unwrittenPayloadLength += current.Length;
+ }
+ }
+
+ if (endStream)
+ {
+ _outgoingFrame.DataFlags = Http2DataFrameFlags.END_STREAM;
+ }
+
+ _outgoingFrame.Length = unwrittenPayloadLength;
+ _outputWriter.Write(_outgoingFrame.Raw);
+
+ return FlushUnsynchronizedAsync();
+ }
+
+ private async Task WriteDataAsyncAwaited(int streamId, Http2StreamOutputFlowControl flowControl, ReadOnlySequence data, long dataLength, bool endStream)
+ {
+ while (dataLength > 0)
+ {
+ Http2OutputFlowControlAwaitable availabilityAwaitable;
+ var writeTask = Task.CompletedTask;
+
+ lock (_writeLock)
+ {
+ if (_completed || flowControl.IsAborted)
+ {
+ break;
+ }
+
+ var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable);
+
+ if (actual > 0)
+ {
+ if (actual < dataLength)
+ {
+ writeTask = WriteDataUnsynchronizedAsync(streamId, data.Slice(0, actual), endStream: false);
+ data = data.Slice(actual);
+ dataLength -= actual;
+ }
+ else
+ {
+ writeTask = WriteDataUnsynchronizedAsync(streamId, data, endStream);
+ dataLength = 0;
+ }
+ }
+ }
+
+ // This awaitable releases continuations in FIFO order when the window updates.
+ // It should be very rare for a continuation to run without any availability.
+ if (availabilityAwaitable != null)
+ {
+ await availabilityAwaitable;
+ }
+
+ await writeTask;
+ }
+
+ // Ensure that the application continuation isn't executed inline by ProcessWindowUpdateFrameAsync.
+ await ThreadPoolAwaitable.Instance;
+ }
+
public Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode)
{
lock (_writeLock)
{
_outgoingFrame.PrepareRstStream(streamId, errorCode);
- return WriteAsync(_outgoingFrame.Raw);
+ return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
}
}
@@ -163,7 +254,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
// TODO: actually send settings
_outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.NONE);
- return WriteAsync(_outgoingFrame.Raw);
+ return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
}
}
@@ -172,7 +263,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_writeLock)
{
_outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.ACK);
- return WriteAsync(_outgoingFrame.Raw);
+ return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
}
}
@@ -182,7 +273,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
_outgoingFrame.PreparePing(Http2PingFrameFlags.ACK);
payload.CopyTo(_outgoingFrame.Payload);
- return WriteAsync(_outgoingFrame.Raw);
+ return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
}
}
@@ -191,23 +282,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_writeLock)
{
_outgoingFrame.PrepareGoAway(lastStreamId, errorCode);
- return WriteAsync(_outgoingFrame.Raw);
+ return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
}
}
- // Must be called with _writeLock
- private void Append(ReadOnlySpan data)
- {
- if (_completed)
- {
- return;
- }
-
- _outputWriter.Write(data);
- }
-
- // Must be called with _writeLock
- private Task WriteAsync(ReadOnlySpan data, CancellationToken cancellationToken = default(CancellationToken))
+ private Task WriteUnsynchronizedAsync(ReadOnlySpan data)
{
if (_completed)
{
@@ -215,12 +294,36 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
_outputWriter.Write(data);
- return FlushAsync(_outputWriter, cancellationToken);
+ return FlushUnsynchronizedAsync();
}
- private async Task FlushAsync(PipeWriter outputWriter, CancellationToken cancellationToken)
+ private Task FlushUnsynchronizedAsync()
{
- await outputWriter.FlushAsync(cancellationToken);
+ return _flusher.FlushAsync();
+ }
+
+ public bool TryUpdateConnectionWindow(int bytes)
+ {
+ lock (_writeLock)
+ {
+ return _connectionOutputFlowControl.TryUpdateWindow(bytes);
+ }
+ }
+
+ public bool TryUpdateStreamWindow(Http2StreamOutputFlowControl flowControl, int bytes)
+ {
+ lock (_writeLock)
+ {
+ return flowControl.TryUpdateWindow(bytes);
+ }
+ }
+
+ public void AbortPendingStreamDataWrites(Http2StreamOutputFlowControl flowControl)
+ {
+ lock (_writeLock)
+ {
+ flowControl.Abort();
+ }
}
private static IEnumerable> EnumerateHeaders(IHeaderDictionary headers)
diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs
new file mode 100644
index 0000000000..4e697cb103
--- /dev/null
+++ b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControl.cs
@@ -0,0 +1,77 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Diagnostics;
+
+namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
+{
+ public class Http2OutputFlowControl
+ {
+ private readonly Queue _awaitableQueue = new Queue();
+
+ public Http2OutputFlowControl(uint initialWindowSize)
+ {
+ Debug.Assert(initialWindowSize <= Http2PeerSettings.MaxWindowSize, $"{nameof(initialWindowSize)} too large.");
+
+ Available = (int)initialWindowSize;
+ }
+
+ public int Available { get; private set; }
+ public bool IsAborted { get; private set; }
+
+ public Http2OutputFlowControlAwaitable AvailabilityAwaitable
+ {
+ get
+ {
+ Debug.Assert(!IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort.");
+ Debug.Assert(Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available.");
+
+ var awaitable = new Http2OutputFlowControlAwaitable();
+ _awaitableQueue.Enqueue(awaitable);
+ return awaitable;
+ }
+ }
+
+ public void Advance(int bytes)
+ {
+ Debug.Assert(!IsAborted, $"({nameof(Advance)} called after abort.");
+ Debug.Assert(bytes == 0 || (bytes > 0 && bytes <= Available), $"{nameof(Advance)}({bytes}) called with {Available} bytes available.");
+
+ Available -= bytes;
+ }
+
+ // bytes can be negative when SETTINGS_INITIAL_WINDOW_SIZE decreases mid-connection.
+ // This can also cause Available to become negative which MUST be allowed.
+ // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2
+ public bool TryUpdateWindow(int bytes)
+ {
+ var maxUpdate = Http2PeerSettings.MaxWindowSize - Available;
+
+ if (bytes > maxUpdate)
+ {
+ return false;
+ }
+
+ Available += bytes;
+
+ while (Available > 0 && _awaitableQueue.Count > 0)
+ {
+ var awaitable = _awaitableQueue.Dequeue();
+ awaitable.Complete();
+ }
+
+ return true;
+ }
+
+ public void Abort()
+ {
+ IsAborted = true;
+
+ while (_awaitableQueue.Count > 0)
+ {
+ _awaitableQueue.Dequeue().Complete();
+ }
+ }
+ }
+}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs
new file mode 100644
index 0000000000..a83a0f9b09
--- /dev/null
+++ b/src/Kestrel.Core/Internal/Http2/Http2OutputFlowControlAwaitable.cs
@@ -0,0 +1,48 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
+{
+ public class Http2OutputFlowControlAwaitable : ICriticalNotifyCompletion
+ {
+ private static readonly Action _callbackCompleted = () => { };
+
+ private Action _callback;
+
+ public Http2OutputFlowControlAwaitable GetAwaiter() => this;
+ public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
+
+ public void GetResult()
+ {
+ Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
+
+ _callback = null;
+ }
+
+ public void OnCompleted(Action continuation)
+ {
+ if (ReferenceEquals(_callback, _callbackCompleted) ||
+ ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
+ {
+ continuation();
+ }
+ }
+
+ public void UnsafeOnCompleted(Action continuation)
+ {
+ OnCompleted(continuation);
+ }
+
+ public void Complete()
+ {
+ var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
+
+ continuation?.Invoke();
+ }
+ }
+}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs
index e701654d1e..03babbedeb 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs
@@ -2,33 +2,79 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
+using System.Buffers;
+using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
+using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
public class Http2OutputProducer : IHttpOutputProducer
{
private readonly int _streamId;
- private readonly IHttp2FrameWriter _frameWriter;
+ private readonly Http2FrameWriter _frameWriter;
+ private readonly StreamSafePipeFlusher _flusher;
- public Http2OutputProducer(int streamId, IHttp2FrameWriter frameWriter)
+ // This should only be accessed via the FrameWriter. The connection-level output flow control is protected by the
+ // FrameWriter's connection-level write lock.
+ private readonly Http2StreamOutputFlowControl _flowControl;
+
+ private readonly object _dataWriterLock = new object();
+ private readonly Pipe _dataPipe;
+ private readonly Task _dataWriteProcessingTask;
+ private bool _startedWritingDataFrames;
+ private bool _completed;
+ private bool _disposed;
+
+ public Http2OutputProducer(
+ int streamId,
+ Http2FrameWriter frameWriter,
+ Http2StreamOutputFlowControl flowControl,
+ ITimeoutControl timeoutControl,
+ MemoryPool pool)
{
_streamId = streamId;
_frameWriter = frameWriter;
+ _flowControl = flowControl;
+ _dataPipe = CreateDataPipe(pool);
+ _flusher = new StreamSafePipeFlusher(_dataPipe.Writer, timeoutControl);
+ _dataWriteProcessingTask = ProcessDataWrites();
}
public void Dispose()
{
+ lock (_dataWriterLock)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _disposed = true;
+
+ if (!_completed)
+ {
+ _completed = true;
+
+ // Complete with an exception to prevent an end of stream data frame from being sent without an
+ // explicit call to WriteStreamSuffixAsync. ConnectionAbortedExceptions are swallowed, so the
+ // message doesn't matter
+ _dataPipe.Writer.Complete(new ConnectionAbortedException());
+ }
+
+ _frameWriter.AbortPendingStreamDataWrites(_flowControl);
+ }
}
- public void Abort(ConnectionAbortedException error)
+ public void Abort(ConnectionAbortedException abortReason)
{
// TODO: RST_STREAM?
+ Dispose();
}
public Task WriteAsync(Func callback, T state)
@@ -36,23 +82,142 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
throw new NotImplementedException();
}
- public Task FlushAsync(CancellationToken cancellationToken) => _frameWriter.FlushAsync(cancellationToken);
-
- public Task Write100ContinueAsync(CancellationToken cancellationToken) => _frameWriter.Write100ContinueAsync(_streamId);
-
- public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken)
+ public Task FlushAsync(CancellationToken cancellationToken)
{
- return _frameWriter.WriteDataAsync(_streamId, data, cancellationToken);
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ lock (_dataWriterLock)
+ {
+ if (_completed)
+ {
+ return Task.CompletedTask;
+ }
+
+ if (_startedWritingDataFrames)
+ {
+ // If there's already been response data written to the stream, just wait for that. Any header
+ // should be in front of the data frames in the connection pipe. Trailers could change things.
+ return _flusher.FlushAsync(0, this, cancellationToken);
+ }
+ else
+ {
+ // Flushing the connection pipe ensures headers already in the pipe are flushed even if no data
+ // frames have been written.
+ return _frameWriter.FlushAsync(this, cancellationToken);
+ }
+ }
}
- public Task WriteStreamSuffixAsync(CancellationToken cancellationToken)
+ public Task Write100ContinueAsync()
{
- return _frameWriter.WriteDataAsync(_streamId, Constants.EmptyData, endStream: true, cancellationToken: cancellationToken);
+ lock (_dataWriterLock)
+ {
+ if (_completed)
+ {
+ return Task.CompletedTask;
+ }
+
+ return _frameWriter.Write100ContinueAsync(_streamId);
+ }
}
public void WriteResponseHeaders(int statusCode, string ReasonPhrase, HttpResponseHeaders responseHeaders)
{
- _frameWriter.WriteResponseHeaders(_streamId, statusCode, responseHeaders);
+ lock (_dataWriterLock)
+ {
+ if (_completed)
+ {
+ return;
+ }
+
+ // The HPACK header compressor is stateful, if we compress headers for an aborted stream we must send them.
+ // Optimize for not compressing or sending them.
+ _frameWriter.WriteResponseHeaders(_streamId, statusCode, responseHeaders);
+ }
}
+
+ public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ lock (_dataWriterLock)
+ {
+ // This length check is important because we don't want to set _startedWritingDataFrames unless a data
+ // frame will actually be written causing the headers to be flushed.
+ if (_completed || data.Length == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ _startedWritingDataFrames = true;
+
+ _dataPipe.Writer.Write(data);
+ return _flusher.FlushAsync(data.Length, this, cancellationToken);
+ }
+ }
+
+ public Task WriteStreamSuffixAsync()
+ {
+ lock (_dataWriterLock)
+ {
+ if (_completed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _completed = true;
+
+ // Even if there's no actual data, completing the writer gracefully sends an END_STREAM DATA frame.
+ _startedWritingDataFrames = true;
+
+ _dataPipe.Writer.Complete();
+ return _dataWriteProcessingTask;
+ }
+ }
+
+ private async Task ProcessDataWrites()
+ {
+ try
+ {
+ ReadResult readResult;
+
+ do
+ {
+ readResult = await _dataPipe.Reader.ReadAsync();
+
+ await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted);
+
+ _dataPipe.Reader.AdvanceTo(readResult.Buffer.End);
+ } while (!readResult.IsCompleted);
+ }
+ catch (ConnectionAbortedException)
+ {
+ // Writes should not throw for aborted connections.
+ }
+ catch (Exception ex)
+ {
+ Debug.Assert(false, ex.ToString());
+ }
+
+ _dataPipe.Reader.Complete();
+ }
+
+ private static Pipe CreateDataPipe(MemoryPool pool)
+ => new Pipe(new PipeOptions
+ (
+ pool: pool,
+ readerScheduler: PipeScheduler.Inline,
+ writerScheduler: PipeScheduler.Inline,
+ pauseWriterThreshold: 1,
+ resumeWriterThreshold: 1,
+ useSynchronizationContext: false,
+ minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
+ ));
}
}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs b/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs
index 4bf4787435..fcf78c4b42 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2PeerSettings.cs
@@ -14,6 +14,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public const uint DefaultInitialWindowSize = 65535;
public const uint DefaultMaxFrameSize = 16384;
public const uint DefaultMaxHeaderListSize = uint.MaxValue;
+ public const uint MaxWindowSize = int.MaxValue;
public uint HeaderTableSize { get; set; } = DefaultHeaderTableSize;
@@ -59,11 +60,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
MaxConcurrentStreams = value;
break;
case Http2SettingsParameter.SETTINGS_INITIAL_WINDOW_SIZE:
- if (value > int.MaxValue)
+ if (value > MaxWindowSize)
{
throw new Http2SettingsParameterOutOfRangeException(Http2SettingsParameter.SETTINGS_INITIAL_WINDOW_SIZE,
lowerBound: 0,
- upperBound: int.MaxValue);
+ upperBound: MaxWindowSize);
}
InitialWindowSize = value;
diff --git a/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs b/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs
index 95db1c9d58..fcaca923b0 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2SettingsParameterOutOfRangeException.cs
@@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
public class Http2SettingsParameterOutOfRangeException : Exception
{
- public Http2SettingsParameterOutOfRangeException(Http2SettingsParameter parameter, uint lowerBound, uint upperBound)
+ public Http2SettingsParameterOutOfRangeException(Http2SettingsParameter parameter, long lowerBound, long upperBound)
: base($"HTTP/2 SETTINGS parameter {parameter} must be set to a value between {lowerBound} and {upperBound}")
{
Parameter = parameter;
diff --git a/src/Kestrel.Core/Internal/Http2/Http2Stream.cs b/src/Kestrel.Core/Internal/Http2/Http2Stream.cs
index 702cd636d5..d47a78be13 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2Stream.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2Stream.cs
@@ -3,13 +3,12 @@
using System;
using System.Buffers;
+using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
-using Microsoft.AspNetCore.Http.Features;
-using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
+using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
-using Microsoft.Extensions.Primitives;
using Microsoft.Net.Http.Headers;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
@@ -17,13 +16,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public partial class Http2Stream : HttpProtocol
{
private readonly Http2StreamContext _context;
+ private readonly Http2StreamOutputFlowControl _outputFlowControl;
public Http2Stream(Http2StreamContext context)
: base(context)
{
_context = context;
+ _outputFlowControl = new Http2StreamOutputFlowControl(context.ConnectionOutputFlowControl, context.ClientPeerSettings.InitialWindowSize);
- Output = new Http2OutputProducer(StreamId, _context.FrameWriter);
+ Output = new Http2OutputProducer(context.StreamId, context.FrameWriter, _outputFlowControl, context.TimeoutControl, context.MemoryPool);
}
public int StreamId => _context.StreamId;
@@ -143,15 +144,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
- // TODO: The HTTP/2 tests expect the request and response streams to be aborted with
- // non-ConnectionAbortedExceptions. The abortReasons can include things like
- // Http2ConnectionErrorException which don't derive from IOException or
- // OperationCanceledException. This is probably not a good idea.
- public void Http2Abort(Exception abortReason)
+ public override void Abort(ConnectionAbortedException abortReason)
{
- _streams?.Abort(abortReason);
+ base.Abort(abortReason);
- OnInputOrOutputCompleted();
+ // Unblock the request body.
+ RequestBodyPipe.Writer.Complete(new IOException(CoreStrings.Http2StreamAborted, abortReason));
+ }
+
+ public bool TryUpdateOutputWindow(int bytes)
+ {
+ return _context.FrameWriter.TryUpdateStreamWindow(_outputFlowControl, bytes);
}
}
}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs b/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs
index eea80103a7..d8d24144a2 100644
--- a/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs
+++ b/src/Kestrel.Core/Internal/Http2/Http2StreamContext.cs
@@ -5,6 +5,7 @@ using System.Buffers;
using System.Net;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
+using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
@@ -18,6 +19,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public IPEndPoint RemoteEndPoint { get; set; }
public IPEndPoint LocalEndPoint { get; set; }
public IHttp2StreamLifetimeHandler StreamLifetimeHandler { get; set; }
- public IHttp2FrameWriter FrameWriter { get; set; }
+ public Http2PeerSettings ClientPeerSettings { get; set; }
+ public Http2FrameWriter FrameWriter { get; set; }
+ public Http2OutputFlowControl ConnectionOutputFlowControl { get; set; }
+ public ITimeoutControl TimeoutControl { get; set; }
}
}
diff --git a/src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs b/src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs
new file mode 100644
index 0000000000..677d821379
--- /dev/null
+++ b/src/Kestrel.Core/Internal/Http2/Http2StreamOutputFlowControl.cs
@@ -0,0 +1,96 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+
+namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
+{
+ public class Http2StreamOutputFlowControl
+ {
+ private readonly Http2OutputFlowControl _connectionLevelFlowControl;
+ private readonly Http2OutputFlowControl _streamLevelFlowControl;
+
+ private Http2OutputFlowControlAwaitable _currentConnectionLevelAwaitable;
+
+ public Http2StreamOutputFlowControl(Http2OutputFlowControl connectionLevelFlowControl, uint initialWindowSize)
+ {
+ _connectionLevelFlowControl = connectionLevelFlowControl;
+ _streamLevelFlowControl = new Http2OutputFlowControl(initialWindowSize);
+ }
+
+ public int Available => Math.Min(_connectionLevelFlowControl.Available, _streamLevelFlowControl.Available);
+
+ public bool IsAborted => _connectionLevelFlowControl.IsAborted || _streamLevelFlowControl.IsAborted;
+
+ public void Advance(int bytes)
+ {
+ _connectionLevelFlowControl.Advance(bytes);
+ _streamLevelFlowControl.Advance(bytes);
+ }
+
+ public int AdvanceUpToAndWait(long bytes, out Http2OutputFlowControlAwaitable awaitable)
+ {
+ var leastAvailableFlow = _connectionLevelFlowControl.Available < _streamLevelFlowControl.Available
+ ? _connectionLevelFlowControl : _streamLevelFlowControl;
+
+ // Clamp ~= Math.Clamp from netcoreapp >= 2.0
+ var actual = Clamp(leastAvailableFlow.Available, 0, bytes);
+
+ // Make sure to advance prior to accessing AvailabilityAwaitable.
+ _connectionLevelFlowControl.Advance(actual);
+ _streamLevelFlowControl.Advance(actual);
+
+ awaitable = null;
+ _currentConnectionLevelAwaitable = null;
+
+ if (actual < bytes)
+ {
+ awaitable = leastAvailableFlow.AvailabilityAwaitable;
+
+ if (leastAvailableFlow == _connectionLevelFlowControl)
+ {
+ _currentConnectionLevelAwaitable = awaitable;
+ }
+ }
+
+ return actual;
+ }
+
+ // The connection-level update window is updated independently.
+ // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
+ public bool TryUpdateWindow(int bytes)
+ {
+ return _streamLevelFlowControl.TryUpdateWindow(bytes);
+ }
+
+ public void Abort()
+ {
+ _streamLevelFlowControl.Abort();
+
+ // If this stream is waiting on a connection-level window update, complete this stream's
+ // connection-level awaitable so the stream abort is observed immediately.
+ // This could complete an awaitable still sitting in the connection-level awaitable queue,
+ // but this is safe because completing it again will just no-op.
+ _currentConnectionLevelAwaitable?.Complete();
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static int Clamp(int value, int min, long max)
+ {
+ Debug.Assert(min <= max, $"{nameof(Clamp)} called with a min greater than the max.");
+
+ if (value < min)
+ {
+ return min;
+ }
+ else if (value > max)
+ {
+ return (int)max;
+ }
+
+ return value;
+ }
+ }
+}
diff --git a/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs
deleted file mode 100644
index aa7b23330b..0000000000
--- a/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (c) .NET Foundation. All rights reserved.
-// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.AspNetCore.Http;
-
-namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
-{
- public interface IHttp2FrameWriter
- {
- void Abort(Exception error);
- Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken));
- Task Write100ContinueAsync(int streamId);
- void WriteResponseHeaders(int streamId, int statusCode, IHeaderDictionary headers);
- Task WriteDataAsync(int streamId, ReadOnlySpan data, CancellationToken cancellationToken);
- Task WriteDataAsync(int streamId, ReadOnlySpan data, bool endStream, CancellationToken cancellationToken);
- Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode);
- Task WriteSettingsAckAsync();
- Task WritePingAsync(Http2PingFrameFlags flags, ReadOnlySpan payload);
- Task WriteGoAwayAsync(int lastStreamId, Http2ErrorCode errorCode);
- }
-}
diff --git a/src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs b/src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs
new file mode 100644
index 0000000000..0caeda11ad
--- /dev/null
+++ b/src/Kestrel.Core/Internal/Infrastructure/StreamSafePipeFlusher.cs
@@ -0,0 +1,80 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.AspNetCore.Connections;
+using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
+
+namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
+{
+ ///
+ /// This wraps PipeWriter.FlushAsync() in a way that allows multiple awaiters making it safe to call from publicly
+ /// exposed Stream implementations.
+ ///
+ public class StreamSafePipeFlusher
+ {
+ private readonly PipeWriter _writer;
+ private readonly ITimeoutControl _timeoutControl;
+ private readonly object _flushLock = new object();
+
+ private Task _lastFlushTask = Task.CompletedTask;
+
+ public StreamSafePipeFlusher(
+ PipeWriter writer,
+ ITimeoutControl timeoutControl)
+ {
+ _writer = writer;
+ _timeoutControl = timeoutControl;
+ }
+
+ public Task FlushAsync(long count = 0, IHttpOutputProducer outputProducer = null, CancellationToken cancellationToken = default)
+ {
+ var flushValueTask = _writer.FlushAsync(cancellationToken);
+
+ if (flushValueTask.IsCompletedSuccessfully)
+ {
+ return Task.CompletedTask;
+ }
+
+ // https://github.com/dotnet/corefxlab/issues/1334
+ // Pipelines don't support multiple awaiters on flush.
+ // While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes,
+ // it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case,
+ // we find previous flush Task which still accounts for any newly committed bytes and await that.
+ lock (_flushLock)
+ {
+ if (_lastFlushTask.IsCompleted)
+ {
+ _lastFlushTask = flushValueTask.AsTask();
+ }
+
+ return TimeFlushAsync(count, outputProducer, cancellationToken);
+ }
+ }
+
+ private async Task TimeFlushAsync(long count, IHttpOutputProducer outputProducer, CancellationToken cancellationToken)
+ {
+ _timeoutControl.StartTimingWrite(count);
+
+ try
+ {
+ await _lastFlushTask;
+ }
+ catch (OperationCanceledException ex)
+ {
+ outputProducer.Abort(new ConnectionAbortedException(CoreStrings.ConnectionOrStreamAbortedByCancellationToken, ex));
+ }
+ catch
+ {
+ // A canceled token is the only reason flush should ever throw.
+ }
+
+ _timeoutControl.StopTimingWrite();
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+ }
+}
diff --git a/src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs b/src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs
new file mode 100644
index 0000000000..1b1e080c97
--- /dev/null
+++ b/src/Kestrel.Core/Internal/Infrastructure/ThreadPoolAwaitable.cs
@@ -0,0 +1,35 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
+{
+ public class ThreadPoolAwaitable : ICriticalNotifyCompletion
+ {
+ public static ThreadPoolAwaitable Instance = new ThreadPoolAwaitable();
+
+ private ThreadPoolAwaitable()
+ {
+ }
+
+ public ThreadPoolAwaitable GetAwaiter() => this;
+ public bool IsCompleted => false;
+
+ public void GetResult()
+ {
+ }
+
+ public void OnCompleted(Action continuation)
+ {
+ ThreadPool.QueueUserWorkItem(state => ((Action)state)(), continuation);
+ }
+
+ public void UnsafeOnCompleted(Action continuation)
+ {
+ OnCompleted(continuation);
+ }
+ }
+}
diff --git a/src/Kestrel.Core/Properties/CoreStrings.Designer.cs b/src/Kestrel.Core/Properties/CoreStrings.Designer.cs
index 29d9ac9bb3..6e6f832906 100644
--- a/src/Kestrel.Core/Properties/CoreStrings.Designer.cs
+++ b/src/Kestrel.Core/Properties/CoreStrings.Designer.cs
@@ -1904,6 +1904,90 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
internal static string FormatHttp2ErrorInvalidPreface()
=> GetString("Http2ErrorInvalidPreface");
+ ///
+ /// The connection or stream was aborted because a write operation was aborted with a CancellationToken.
+ ///
+ internal static string ConnectionOrStreamAbortedByCancellationToken
+ {
+ get => GetString("ConnectionOrStreamAbortedByCancellationToken");
+ }
+
+ ///
+ /// The connection or stream was aborted because a write operation was aborted with a CancellationToken.
+ ///
+ internal static string FormatConnectionOrStreamAbortedByCancellationToken()
+ => GetString("ConnectionOrStreamAbortedByCancellationToken");
+
+ ///
+ /// The client sent a SETTINGS frame with a SETTINGS_INITIAL_WINDOW_SIZE that caused a flow-control window to exceed the maximum size.
+ ///
+ internal static string Http2ErrorInitialWindowSizeInvalid
+ {
+ get => GetString("Http2ErrorInitialWindowSizeInvalid");
+ }
+
+ ///
+ /// The client sent a SETTINGS frame with a SETTINGS_INITIAL_WINDOW_SIZE that caused a flow-control window to exceed the maximum size.
+ ///
+ internal static string FormatHttp2ErrorInitialWindowSizeInvalid()
+ => GetString("Http2ErrorInitialWindowSizeInvalid");
+
+ ///
+ /// The client sent a WINDOW_UPDATE frame that caused a flow-control window to exceed the maximum size.
+ ///
+ internal static string Http2ErrorWindowUpdateSizeInvalid
+ {
+ get => GetString("Http2ErrorWindowUpdateSizeInvalid");
+ }
+
+ ///
+ /// The client sent a WINDOW_UPDATE frame that caused a flow-control window to exceed the maximum size.
+ ///
+ internal static string FormatHttp2ErrorWindowUpdateSizeInvalid()
+ => GetString("Http2ErrorWindowUpdateSizeInvalid");
+
+ ///
+ /// The HTTP/2 connection faulted.
+ ///
+ internal static string Http2ConnectionFaulted
+ {
+ get => GetString("Http2ConnectionFaulted");
+ }
+
+ ///
+ /// The HTTP/2 connection faulted.
+ ///
+ internal static string FormatHttp2ConnectionFaulted()
+ => GetString("Http2ConnectionFaulted");
+
+ ///
+ /// The client reset the request stream.
+ ///
+ internal static string Http2StreamResetByClient
+ {
+ get => GetString("Http2StreamResetByClient");
+ }
+
+ ///
+ /// The client reset the request stream.
+ ///
+ internal static string FormatHttp2StreamResetByClient()
+ => GetString("Http2StreamResetByClient");
+
+ ///
+ /// The request stream was aborted.
+ ///
+ internal static string Http2StreamAborted
+ {
+ get => GetString("Http2StreamAborted");
+ }
+
+ ///
+ /// The request stream was aborted.
+ ///
+ internal static string FormatHttp2StreamAborted()
+ => GetString("Http2StreamAborted");
+
private static string GetString(string name, params string[] formatterNames)
{
var value = _resourceManager.GetString(name);
diff --git a/test/Kestrel.Core.Tests/Http2ConnectionTests.cs b/test/Kestrel.Core.Tests/Http2ConnectionTests.cs
index fae8ffb2fb..e6871565ab 100644
--- a/test/Kestrel.Core.Tests/Http2ConnectionTests.cs
+++ b/test/Kestrel.Core.Tests/Http2ConnectionTests.cs
@@ -125,6 +125,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
private readonly RequestDelegate _largeHeadersApplication;
private readonly RequestDelegate _waitForAbortApplication;
private readonly RequestDelegate _waitForAbortFlushingApplication;
+ private readonly RequestDelegate _waitForAbortWithDataApplication;
private Task _connectionTask;
@@ -269,6 +270,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_runningStreams[streamIdFeature.StreamId].TrySetResult(null);
};
+ _waitForAbortWithDataApplication = async context =>
+ {
+ var streamIdFeature = context.Features.Get();
+ var sem = new SemaphoreSlim(0);
+
+ context.RequestAborted.Register(() =>
+ {
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamIdFeature.StreamId);
+ }
+
+ sem.Release();
+ });
+
+ await sem.WaitAsync().DefaultTimeout();
+
+ await context.Response.Body.WriteAsync(new byte[10], 0, 10);
+
+ _runningStreams[streamIdFeature.StreamId].TrySetResult(null);
+ };
+
_hpackDecoder = new HPackDecoder((int)_clientSettings.HeaderTableSize);
_logger = new TestApplicationErrorLogger();
@@ -465,6 +488,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 3);
+ // Ensure that Http2FrameWriter._writeLock isn't acquired when completing the frame processing loop.
+ // Otherwise there's a deadlock where Http2OutputProducer.Abort() being called from the frame processing
+ // loop blocks waiting Http2OutputProducer.Dispose() being called from the stream processing loop to
+ // acquire the _writeLock.
+ await ThreadPoolAwaitable.Instance;
+
await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
Assert.Equal(stream1DataFrame1.DataPayload, _helloBytes);
@@ -536,6 +565,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
+ // Ensure that Http2FrameWriter._writeLock isn't acquired when completing the frame processing loop.
+ // Otherwise there's a deadlock where Http2OutputProducer.Abort() being called from the frame processing
+ // loop blocks waiting Http2OutputProducer.Dispose() being called from the stream processing loop to
+ // acquire the _writeLock.
+ await ThreadPoolAwaitable.Instance;
+
await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
}
@@ -752,6 +787,122 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
expectedErrorMessage: CoreStrings.FormatHttp2ErrorStreamClosed(Http2FrameType.DATA, streamId: 1));
}
+ [Fact]
+ public async Task DATA_Sent_DespiteConnectionBackpressure_IfEmptyAndEndsStream()
+ {
+ // Zero-length data frames are allowed to be sent even if there is no space available in the flow control window.
+ // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
+
+ var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length;
+ var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length;
+ var remainingBytesAfterBackpressure = _maxData.Length - remainingBytesBeforeBackpressure;
+
+ // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test.
+ _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2;
+
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamId = context.Features.Get().StreamId;
+
+ try
+ {
+ if (streamId == 1)
+ {
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure + 1; i++)
+ {
+ await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length);
+ }
+ }
+
+ _runningStreams[streamId].SetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamId].SetException(ex);
+ throw;
+ }
+ });
+
+ // Start one stream that consumes the entire connection output window.
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 1);
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: _maxData.Length,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+ }
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesBeforeBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ // Start one more stream that receives an empty response despite connection backpressure.
+ await StartStreamAsync(3, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 55,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 3);
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: 0,
+ withFlags: (byte)Http2DataFrameFlags.END_STREAM,
+ withStreamId: 3);
+
+ // Relieve connection backpressure to receive the rest of the first streams body.
+ await SendWindowUpdateAsync(0, remainingBytesAfterBackpressure);
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesAfterBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: 0,
+ withFlags: (byte)Http2DataFrameFlags.END_STREAM,
+ withStreamId: 1);
+
+ // Ensure that Http2FrameWriter._writeLock isn't acquired when completing the frame processing loop.
+ // Otherwise there's a deadlock where Http2OutputProducer.Abort() being called from the frame processing
+ // loop blocks waiting Http2OutputProducer.Dispose() being called from the stream processing loop to
+ // acquire the _writeLock.
+ await ThreadPoolAwaitable.Instance;
+
+ await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
+ await WaitForAllStreamsAsync();
+ }
+
+ [Fact]
+ public async Task DATA_Sent_DespiteStreamBackpressure_IfEmptyAndEndsStream()
+ {
+ // Zero-length data frames are allowed to be sent even if there is no space available in the flow control window.
+ // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
+
+ // This only affects the stream windows. The connection-level window is always initialized at 64KiB.
+ _clientSettings.InitialWindowSize = 0;
+
+ await InitializeConnectionAsync(_noopApplication);
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 55,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 1);
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: 0,
+ withFlags: (byte)Http2DataFrameFlags.END_STREAM,
+ withStreamId: 1);
+
+ await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+ }
+
[Fact]
public async Task HEADERS_Received_Decoded()
{
@@ -1197,7 +1348,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, headerBlock);
await WaitForStreamErrorAsync(
- ignoreNonRstStreamFrames: false,
expectedStreamId: 1,
expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR,
expectedErrorMessage: CoreStrings.Http2ErrorHeaderNameUppercase);
@@ -1286,7 +1436,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, headers);
await WaitForStreamErrorAsync(
- ignoreNonRstStreamFrames: false,
expectedStreamId: 1,
expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR,
expectedErrorMessage: expectedErrorMessage);
@@ -1740,33 +1889,351 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
await SendRstStreamAsync(1);
-
- // No data is received from the stream since it was aborted before writing anything
-
- await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
-
await WaitForAllStreamsAsync();
Assert.Contains(1, _abortedStreamIds);
+
+ await SendGoAwayAsync();
+
+ // No data is received from the stream since it was aborted before writing anything
+ await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ // TODO: Check logs
}
[Fact]
- public async Task RST_STREAM_Received_AbortsStream_FlushedDataIsSent()
+ public async Task RST_STREAM_Received_AbortsStream_FlushedHeadersNotSent()
{
await InitializeConnectionAsync(_waitForAbortFlushingApplication);
await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
await SendRstStreamAsync(1);
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+
+ await SendGoAwayAsync();
+
+ // No END_STREAM HEADERS or DATA frame is received since the stream was aborted
+ await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ // TODO: Check logs
+ }
+
+ [Fact]
+ public async Task RST_STREAM_Received_AbortsStream_FlushedDataNotSent()
+ {
+ await InitializeConnectionAsync(_waitForAbortWithDataApplication);
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
+ await SendRstStreamAsync(1);
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+
+ await SendGoAwayAsync();
+
+ // No END_STREAM HEADERS or DATA frame is received since the stream was aborted
+ await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ // TODO: Check logs
+ }
+
+ [Fact]
+ public async Task RST_STREAM_Received_RelievesConnectionBackpressure()
+ {
+ var writeTasks = new Task[4];
+
+ var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length;
+ var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length;
+
+ // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test.
+ _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2;
+
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamId = context.Features.Get().StreamId;
+
+ var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ context.RequestAborted.Register(() =>
+ {
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamId);
+ abortedTcs.SetResult(null);
+ }
+ });
+
+ try
+ {
+ writeTasks[streamId] = writeTcs.Task;
+
+ // Flush headers even if the body can't yet be written because of flow control.
+ await context.Response.Body.FlushAsync();
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length);
+ }
+
+ await context.Response.Body.WriteAsync(_maxData, 0, remainingBytesBeforeBackpressure + 1);
+
+ writeTcs.SetResult(null);
+
+ await abortedTcs.Task;
+
+ _runningStreams[streamId].SetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamId].SetException(ex);
+ throw;
+ }
+ });
+
+ // Start one stream that consumes the entire connection output window.
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
- // No END_STREAM DATA frame is received since the stream was aborted
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: _maxData.Length,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+ }
- await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesBeforeBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+ // Ensure connection-level backpressure was hit.
+ Assert.False(writeTasks[1].IsCompleted);
+
+ // Start another stream that immediately experiences backpressure.
+ await StartStreamAsync(3, _browserRequestHeaders, endStream: true);
+
+ // The headers, but not the data for stream 3, can be sent prior to any window updates.
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 3);
+
+ await SendRstStreamAsync(1);
+ // Any paused writes for stream 1 should complete after an RST_STREAM
+ // even without any preceeding window updates.
+ await _runningStreams[1].Task.DefaultTimeout();
+
+ // A connection-level window update allows the non-reset stream to continue.
+ await SendWindowUpdateAsync(0, (int)Http2PeerSettings.DefaultInitialWindowSize);
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: _maxData.Length,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 3);
+ }
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesBeforeBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 3);
+
+ Assert.False(writeTasks[3].IsCompleted);
+
+ await SendRstStreamAsync(3);
+ await _runningStreams[3].Task.DefaultTimeout();
+
+ await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
+
+ await WaitForAllStreamsAsync();
Assert.Contains(1, _abortedStreamIds);
+ Assert.Contains(3, _abortedStreamIds);
+ }
+
+ [Fact]
+ public async Task RST_STREAM_Received_RelievesStreamBackpressure()
+ {
+ var writeTasks = new Task[6];
+ var initialWindowSize = _helloWorldBytes.Length / 2;
+
+ // This only affects the stream windows. The connection-level window is always initialized at 64KiB.
+ _clientSettings.InitialWindowSize = (uint)initialWindowSize;
+
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamId = context.Features.Get().StreamId;
+
+ var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ context.RequestAborted.Register(() =>
+ {
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamId);
+ abortedTcs.SetResult(null);
+ }
+ });
+
+ try
+ {
+ writeTasks[streamId] = writeTcs.Task;
+ await context.Response.Body.WriteAsync(_helloWorldBytes, 0, _helloWorldBytes.Length);
+ writeTcs.SetResult(null);
+
+ await abortedTcs.Task;
+
+ _runningStreams[streamId].SetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamId].SetException(ex);
+ throw;
+ }
+ });
+
+ async Task VerifyStreamBackpressure(int streamId)
+ {
+ await StartStreamAsync(streamId, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: streamId);
+
+ var dataFrame = await ExpectAsync(Http2FrameType.DATA,
+ withLength: initialWindowSize,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: streamId);
+
+ Assert.Equal(dataFrame.DataPayload, new ArraySegment(_helloWorldBytes, 0, initialWindowSize));
+ Assert.False(writeTasks[streamId].IsCompleted);
+ }
+
+ await VerifyStreamBackpressure(1);
+ await VerifyStreamBackpressure(3);
+ await VerifyStreamBackpressure(5);
+
+ await SendRstStreamAsync(1);
+ await writeTasks[1].DefaultTimeout();
+ Assert.False(writeTasks[3].IsCompleted);
+ Assert.False(writeTasks[5].IsCompleted);
+
+ await SendRstStreamAsync(3);
+ await writeTasks[3].DefaultTimeout();
+ Assert.False(writeTasks[5].IsCompleted);
+
+ await SendRstStreamAsync(5);
+ await writeTasks[5].DefaultTimeout();
+
+ await StopConnectionAsync(expectedLastStreamId: 5, ignoreNonGoAwayFrames: false);
+
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+ Assert.Contains(3, _abortedStreamIds);
+ Assert.Contains(5, _abortedStreamIds);
+ }
+
+ [Fact]
+ public async Task RST_STREAM_WaitingForRequestBody_RequestBodyThrows()
+ {
+ var sem = new SemaphoreSlim(0);
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamIdFeature = context.Features.Get();
+
+ try
+ {
+ var readTask = context.Request.Body.ReadAsync(new byte[100], 0, 100).DefaultTimeout();
+ sem.Release();
+ await readTask;
+
+ _runningStreams[streamIdFeature.StreamId].TrySetException(new Exception("ReadAsync was expected to throw."));
+ }
+ catch (IOException) // Expected failure
+ {
+ await context.Response.Body.WriteAsync(new byte[10], 0, 10);
+
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamIdFeature.StreamId);
+ }
+
+ _runningStreams[streamIdFeature.StreamId].TrySetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamIdFeature.StreamId].TrySetException(ex);
+ }
+ });
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
+ await sem.WaitAsync().DefaultTimeout();
+ await SendRstStreamAsync(1);
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+
+ await SendGoAwayAsync();
+
+ // No data is received from the stream since it was aborted before writing anything
+ await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ // TODO: Check logs
+ }
+
+ [Fact]
+ public async Task RST_STREAM_IncompleteRequest_RequestBodyThrows()
+ {
+ var sem = new SemaphoreSlim(0);
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamIdFeature = context.Features.Get();
+
+ try
+ {
+ var read = await context.Request.Body.ReadAsync(new byte[100], 0, 100).DefaultTimeout();
+ var readTask = context.Request.Body.ReadAsync(new byte[100], 0, 100).DefaultTimeout();
+ sem.Release();
+ await readTask;
+
+ _runningStreams[streamIdFeature.StreamId].TrySetException(new Exception("ReadAsync was expected to throw."));
+ }
+ catch (IOException) // Expected failure
+ {
+ await context.Response.Body.WriteAsync(new byte[10], 0, 10);
+
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamIdFeature.StreamId);
+ }
+
+ _runningStreams[streamIdFeature.StreamId].TrySetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamIdFeature.StreamId].TrySetException(ex);
+ }
+ });
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
+ await SendDataAsync(1, new byte[10], endStream: false);
+ await sem.WaitAsync().DefaultTimeout();
+ await SendRstStreamAsync(1);
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+
+ await SendGoAwayAsync();
+
+ // No data is received from the stream since it was aborted before writing anything
+ await WaitForConnectionStopAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ // TODO: Check logs
}
[Fact]
@@ -1940,6 +2407,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
expectedErrorMessage: CoreStrings.Http2ErrorSettingsLengthNotMultipleOfSix);
}
+ [Fact]
+ public async Task SETTINGS_Received_WithInitialWindowSizePushingStreamWindowOverMax_ConnectionError()
+ {
+ await InitializeConnectionAsync(_waitForAbortApplication);
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
+
+ await SendWindowUpdateAsync(1, (int)(Http2PeerSettings.MaxWindowSize - _clientSettings.InitialWindowSize));
+
+ _clientSettings.InitialWindowSize += 1;
+ await SendSettingsAsync();
+
+ await WaitForConnectionErrorAsync(
+ ignoreNonGoAwayFrames: false,
+ expectedLastStreamId: 1,
+ expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR,
+ expectedErrorMessage: CoreStrings.Http2ErrorInitialWindowSizeInvalid);
+ }
+
[Fact]
public async Task PUSH_PROMISE_Received_ConnectionError()
{
@@ -2055,6 +2541,180 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.Contains(5, _abortedStreamIds);
}
+ [Fact]
+ public async Task GOAWAY_Received_RelievesConnectionBackpressure()
+ {
+ var writeTasks = new Task[6];
+ var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length;
+ var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length;
+
+ // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test.
+ _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2;
+
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamId = context.Features.Get().StreamId;
+
+ var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ context.RequestAborted.Register(() =>
+ {
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamId);
+ abortedTcs.SetResult(null);
+ }
+ });
+
+ try
+ {
+ writeTasks[streamId] = writeTcs.Task;
+
+ // Flush headers even if the body can't yet be written because of flow control.
+ await context.Response.Body.FlushAsync();
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length);
+ }
+
+ await context.Response.Body.WriteAsync(_maxData, 0, remainingBytesBeforeBackpressure + 1);
+
+ writeTcs.SetResult(null);
+
+ await abortedTcs.Task;
+
+ _runningStreams[streamId].SetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamId].SetException(ex);
+ throw;
+ }
+ });
+
+ // Start one stream that consumes the entire connection output window.
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 1);
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: _maxData.Length,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+ }
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesBeforeBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ Assert.False(writeTasks[1].IsCompleted);
+
+ // Start two more streams that immediately experience backpressure.
+ // The headers, but not the data for the stream, can still be sent.
+ await StartStreamAsync(3, _browserRequestHeaders, endStream: true);
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 3);
+
+ await StartStreamAsync(5, _browserRequestHeaders, endStream: true);
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 5);
+
+ await SendGoAwayAsync();
+
+ await WaitForConnectionStopAsync(expectedLastStreamId: 5, ignoreNonGoAwayFrames: false);
+
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+ Assert.Contains(3, _abortedStreamIds);
+ Assert.Contains(5, _abortedStreamIds);
+ }
+
+ [Fact]
+ public async Task GOAWAY_Received_RelievesStreamBackpressure()
+ {
+ var writeTasks = new Task[6];
+ var initialWindowSize = _helloWorldBytes.Length / 2;
+
+ // This only affects the stream windows. The connection-level window is always initialized at 64KiB.
+ _clientSettings.InitialWindowSize = (uint)initialWindowSize;
+
+ await InitializeConnectionAsync(async context =>
+ {
+ var streamId = context.Features.Get().StreamId;
+
+ var abortedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var writeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ context.RequestAborted.Register(() =>
+ {
+ lock (_abortedStreamIdsLock)
+ {
+ _abortedStreamIds.Add(streamId);
+ abortedTcs.SetResult(null);
+ }
+ });
+
+ try
+ {
+ writeTasks[streamId] = writeTcs.Task;
+ await context.Response.Body.WriteAsync(_helloWorldBytes, 0, _helloWorldBytes.Length);
+ writeTcs.SetResult(null);
+
+ await abortedTcs.Task;
+
+ _runningStreams[streamId].SetResult(null);
+ }
+ catch (Exception ex)
+ {
+ _runningStreams[streamId].SetException(ex);
+ throw;
+ }
+ });
+
+ async Task VerifyStreamBackpressure(int streamId)
+ {
+ await StartStreamAsync(streamId, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: streamId);
+
+ var dataFrame = await ExpectAsync(Http2FrameType.DATA,
+ withLength: initialWindowSize,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: streamId);
+
+ Assert.Equal(dataFrame.DataPayload, new ArraySegment(_helloWorldBytes, 0, initialWindowSize));
+ Assert.False(writeTasks[streamId].IsCompleted);
+ }
+
+ await VerifyStreamBackpressure(1);
+ await VerifyStreamBackpressure(3);
+ await VerifyStreamBackpressure(5);
+
+ await SendGoAwayAsync();
+
+ await WaitForConnectionStopAsync(expectedLastStreamId: 5, ignoreNonGoAwayFrames: false);
+
+ await WaitForAllStreamsAsync();
+ Assert.Contains(1, _abortedStreamIds);
+ Assert.Contains(3, _abortedStreamIds);
+ Assert.Contains(5, _abortedStreamIds);
+ }
+
[Fact]
public async Task GOAWAY_Received_StreamIdNotZero_ConnectionError()
{
@@ -2174,6 +2834,230 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
expectedErrorMessage: CoreStrings.FormatHttp2ErrorStreamIdle(Http2FrameType.WINDOW_UPDATE, streamId: 1));
}
+ [Fact]
+ public async Task WINDOW_UPDATE_Received_OnConnection_IncreasesWindowAboveMaxValue_ConnectionError()
+ {
+ var maxIncrement = (int)(Http2PeerSettings.MaxWindowSize - Http2PeerSettings.DefaultInitialWindowSize);
+
+ await InitializeConnectionAsync(_noopApplication);
+
+ await SendWindowUpdateAsync(0, sizeIncrement: maxIncrement);
+ await SendWindowUpdateAsync(0, sizeIncrement: 1);
+
+ await WaitForConnectionErrorAsync(
+ ignoreNonGoAwayFrames: false,
+ expectedLastStreamId: 0,
+ expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR,
+ expectedErrorMessage: CoreStrings.Http2ErrorWindowUpdateSizeInvalid);
+ }
+
+ [Fact]
+ public async Task WINDOW_UPDATE_Received_OnStream_IncreasesWindowAboveMaxValue_StreamError()
+ {
+ var maxIncrement = (int)(Http2PeerSettings.MaxWindowSize - Http2PeerSettings.DefaultInitialWindowSize);
+
+ await InitializeConnectionAsync(_waitForAbortApplication);
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
+ await SendWindowUpdateAsync(1, sizeIncrement: maxIncrement);
+ await SendWindowUpdateAsync(1, sizeIncrement: 1);
+
+ await WaitForStreamErrorAsync(
+ expectedStreamId: 1,
+ expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR,
+ expectedErrorMessage: CoreStrings.Http2ErrorWindowUpdateSizeInvalid);
+
+ await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+ }
+
+ [Fact]
+ public async Task WINDOW_UPDATE_Received_OnConnection_Respected()
+ {
+ var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length;
+
+ // Use this semaphore to wait until a new data frame is expected before trying to send it.
+ // This way we're sure that if Response.Body.WriteAsync returns an incomplete task, it's because
+ // of the flow control window and not Pipe backpressure.
+ var expectingDataSem = new SemaphoreSlim(0);
+ var backpressureObservedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var backpressureReleasedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ // Double the stream window to be 128KiB so it doesn't interfere with the rest of the test.
+ _clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2;
+
+ await InitializeConnectionAsync(async context =>
+ {
+ try
+ {
+ // Flush the headers so expectingDataSem is released.
+ await context.Response.Body.FlushAsync();
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ await expectingDataSem.WaitAsync();
+ Assert.True(context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length).IsCompleted);
+ }
+
+ await expectingDataSem.WaitAsync();
+ var lastWriteTask = context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length);
+
+ Assert.False(lastWriteTask.IsCompleted);
+ backpressureObservedTcs.TrySetResult(null);
+
+ await lastWriteTask;
+ backpressureReleasedTcs.TrySetResult(null);
+ }
+ catch (Exception ex)
+ {
+ backpressureObservedTcs.TrySetException(ex);
+ backpressureReleasedTcs.TrySetException(ex);
+ throw;
+ }
+ });
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 1);
+
+ for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++)
+ {
+ expectingDataSem.Release();
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: _maxData.Length,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+ }
+
+ var remainingBytesBeforeBackpressure = (int)Http2PeerSettings.DefaultInitialWindowSize % _maxData.Length;
+ var remainingBytesAfterBackpressure = _maxData.Length - remainingBytesBeforeBackpressure;
+
+ expectingDataSem.Release();
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesBeforeBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ await backpressureObservedTcs.Task.DefaultTimeout();
+
+ await SendWindowUpdateAsync(0, remainingBytesAfterBackpressure);
+
+ await backpressureReleasedTcs.Task.DefaultTimeout();
+
+ // This is the remaining data that could have come in the last frame if not for the flow control window,
+ // so there's no need to release the semaphore again.
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: remainingBytesAfterBackpressure,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: 0,
+ withFlags: (byte)Http2DataFrameFlags.END_STREAM,
+ withStreamId: 1);
+
+ await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+ }
+
+ [Fact]
+ public async Task WINDOW_UPDATE_Received_OnStream_Respected()
+ {
+ var initialWindowSize = _helloWorldBytes.Length / 2;
+
+ // This only affects the stream windows. The connection-level window is always initialized at 64KiB.
+ _clientSettings.InitialWindowSize = (uint)initialWindowSize;
+
+ await InitializeConnectionAsync(_echoApplication);
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
+ await SendDataAsync(1, _helloWorldBytes, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 1);
+
+ var dataFrame1 = await ExpectAsync(Http2FrameType.DATA,
+ withLength: initialWindowSize,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ await SendWindowUpdateAsync(1, initialWindowSize);
+
+ var dataFrame2 = await ExpectAsync(Http2FrameType.DATA,
+ withLength: initialWindowSize,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: 0,
+ withFlags: (byte)Http2DataFrameFlags.END_STREAM,
+ withStreamId: 1);
+
+ await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ Assert.Equal(dataFrame1.DataPayload, new ArraySegment(_helloWorldBytes, 0, initialWindowSize));
+ Assert.Equal(dataFrame2.DataPayload, new ArraySegment(_helloWorldBytes, initialWindowSize, initialWindowSize));
+ }
+
+ [Fact]
+ public async Task WINDOW_UPDATE_Received_OnStream_Respected_WhenInitialWindowSizeReducedMidStream()
+ {
+ // This only affects the stream windows. The connection-level window is always initialized at 64KiB.
+ _clientSettings.InitialWindowSize = 6;
+
+ await InitializeConnectionAsync(_echoApplication);
+
+ await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
+ await SendDataAsync(1, _helloWorldBytes, endStream: true);
+
+ await ExpectAsync(Http2FrameType.HEADERS,
+ withLength: 37,
+ withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
+ withStreamId: 1);
+
+ var dataFrame1 = await ExpectAsync(Http2FrameType.DATA,
+ withLength: 6,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ // Reduce the initial window size for response data by 3 bytes.
+ _clientSettings.InitialWindowSize = 3;
+ await SendSettingsAsync();
+
+ await ExpectAsync(Http2FrameType.SETTINGS,
+ withLength: 0,
+ withFlags: (byte)Http2SettingsFrameFlags.ACK,
+ withStreamId: 0);
+
+ await SendWindowUpdateAsync(1, 6);
+
+ var dataFrame2 = await ExpectAsync(Http2FrameType.DATA,
+ withLength: 3,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ await SendWindowUpdateAsync(1, 3);
+
+ var dataFrame3 = await ExpectAsync(Http2FrameType.DATA,
+ withLength: 3,
+ withFlags: (byte)Http2DataFrameFlags.NONE,
+ withStreamId: 1);
+
+ await ExpectAsync(Http2FrameType.DATA,
+ withLength: 0,
+ withFlags: (byte)Http2DataFrameFlags.END_STREAM,
+ withStreamId: 1);
+
+ await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
+
+ Assert.Equal(dataFrame1.DataPayload, new ArraySegment(_helloWorldBytes, 0, 6));
+ Assert.Equal(dataFrame2.DataPayload, new ArraySegment(_helloWorldBytes, 6, 3));
+ Assert.Equal(dataFrame3.DataPayload, new ArraySegment(_helloWorldBytes, 9, 3));
+ }
+
[Fact]
public async Task CONTINUATION_Received_Decoded()
{
@@ -2313,7 +3197,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await SendEmptyContinuationFrameAsync(1, Http2ContinuationFrameFlags.END_HEADERS);
await WaitForStreamErrorAsync(
- ignoreNonRstStreamFrames: false,
expectedStreamId: 1,
expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR,
expectedErrorMessage: CoreStrings.Http2ErrorMissingMandatoryPseudoHeaderFields);
@@ -3028,18 +3911,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_pair.Application.Output.Complete();
}
- private async Task WaitForStreamErrorAsync(bool ignoreNonRstStreamFrames, int expectedStreamId, Http2ErrorCode expectedErrorCode, string expectedErrorMessage)
+ private async Task WaitForStreamErrorAsync(int expectedStreamId, Http2ErrorCode expectedErrorCode, string expectedErrorMessage)
{
var frame = await ReceiveFrameAsync();
- if (ignoreNonRstStreamFrames)
- {
- while (frame.Type != Http2FrameType.RST_STREAM)
- {
- frame = await ReceiveFrameAsync();
- }
- }
-
Assert.Equal(Http2FrameType.RST_STREAM, frame.Type);
Assert.Equal(4, frame.Length);
Assert.Equal(0, frame.Flags);
diff --git a/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs b/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs
index ca916cc4ae..9e2db9ba26 100644
--- a/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs
+++ b/test/Kestrel.FunctionalTests/Http2/H2SpecTests.cs
@@ -54,8 +54,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.Http2
get
{
var dataset = new TheoryData();
- var toSkip = new[] { "hpack/4.2/1", "http2/5.1/8", "http2/6.9.1/2", "http2/6.9.1/3", "http2/8.1.2.3/1",
- "http2/8.1.2.6/1", "http2/8.1.2.6/2" };
+ var toSkip = new[] { "hpack/4.2/1", "http2/5.1/8", "http2/8.1.2.3/1", "http2/8.1.2.6/1", "http2/8.1.2.6/2" };
foreach (var testcase in H2SpecCommands.EnumerateTestCases())
{