Implement HTTP/2 input flow control (#2740)

This commit is contained in:
Stephen Halter 2018-07-27 10:47:27 -07:00 committed by GitHub
parent 1f3524787e
commit 94cfc01fbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 921 additions and 172 deletions

View File

@ -545,6 +545,9 @@ For more information on configuring HTTPS see https://go.microsoft.com/fwlink/?l
<data name="Http2StreamAborted" xml:space="preserve">
<value>The request stream was aborted.</value>
</data>
<data name="Http2ErrorFlowControlWindowExceeded" xml:space="preserve">
<value>The client sent more data than what was available in the flow-control window.</value>
</data>
<data name="Http2ErrorConnectMustNotSendSchemeOrPath" xml:space="preserve">
<value>CONNECT requests must not send :scheme or :path headers.</value>
</data>

View File

@ -44,6 +44,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_keepAliveTicks = ServerOptions.Limits.KeepAliveTimeout.Ticks;
_requestHeadersTimeoutTicks = ServerOptions.Limits.RequestHeadersTimeout.Ticks;
RequestBodyPipe = CreateRequestBodyPipe();
Output = new Http1OutputProducer(
_context.Transport.Output,
_context.ConnectionId,
@ -470,5 +471,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return false;
}
}
private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
readerScheduler: ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
useSynchronizationContext: false,
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
));
}
}

View File

@ -73,12 +73,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
ServerOptions = ServiceContext.ServerOptions;
HttpResponseControl = this;
RequestBodyPipe = CreateRequestBodyPipe();
}
public IHttpResponseControl HttpResponseControl { get; set; }
public Pipe RequestBodyPipe { get; }
public Pipe RequestBodyPipe { get; protected set; }
public ServiceContext ServiceContext => _context.ServiceContext;
private IPEndPoint LocalEndPoint => _context.LocalEndPoint;
@ -1332,17 +1331,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
Log.ApplicationError(ConnectionId, TraceIdentifier, ex);
}
private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
readerScheduler: ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
useSynchronizationContext: false,
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
));
}
}

View File

@ -45,16 +45,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
var result = await _context.RequestBodyPipe.Reader.ReadAsync();
var readableBuffer = result.Buffer;
var consumed = readableBuffer.End;
var actual = 0;
try
{
if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
var actual = (int)Math.Min(readableBuffer.Length, buffer.Length);
// buffer.Count is int
actual = (int)Math.Min(readableBuffer.Length, buffer.Length);
var slice = readableBuffer.Slice(0, actual);
consumed = readableBuffer.GetPosition(actual);
slice.CopyTo(buffer.Span);
return actual;
}
@ -66,6 +68,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
finally
{
_context.RequestBodyPipe.Reader.AdvanceTo(consumed);
// Update the flow-control window after advancing the pipe reader, so we don't risk overfilling
// the pipe despite the client being well-behaved.
OnDataRead(actual);
}
}
}
@ -79,6 +85,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
var result = await _context.RequestBodyPipe.Reader.ReadAsync();
var readableBuffer = result.Buffer;
var consumed = readableBuffer.End;
var bytesRead = 0;
try
{
@ -89,6 +96,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
// REVIEW: This *could* be slower if 2 things are true
// - The WriteAsync(ReadOnlyMemory<byte>) isn't overridden on the destination
// - We change the Kestrel Memory Pool to not use pinned arrays but instead use native memory
bytesRead += memory.Length;
#if NETCOREAPP2_1
await destination.WriteAsync(memory);
#elif NETSTANDARD2_0
@ -108,6 +118,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
finally
{
_context.RequestBodyPipe.Reader.AdvanceTo(consumed);
// Update the flow-control window after advancing the pipe reader, so we don't risk overfilling
// the pipe despite the client being well-behaved.
OnDataRead(bytesRead);
}
}
}
@ -150,6 +164,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
}
protected virtual void OnDataRead(int bytesRead)
{
}
private class ForZeroContentLength : MessageBody
{
public ForZeroContentLength(bool keepAlive)

View File

@ -1,38 +1,23 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Diagnostics;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class Http2OutputFlowControl
public struct FlowControl
{
private readonly Queue<Http2OutputFlowControlAwaitable> _awaitableQueue = new Queue<Http2OutputFlowControlAwaitable>();
public Http2OutputFlowControl(uint initialWindowSize)
public FlowControl(uint initialWindowSize)
{
Debug.Assert(initialWindowSize <= Http2PeerSettings.MaxWindowSize, $"{nameof(initialWindowSize)} too large.");
Available = (int)initialWindowSize;
IsAborted = false;
}
public int Available { get; private set; }
public bool IsAborted { get; private set; }
public Http2OutputFlowControlAwaitable AvailabilityAwaitable
{
get
{
Debug.Assert(!IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort.");
Debug.Assert(Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available.");
var awaitable = new Http2OutputFlowControlAwaitable();
_awaitableQueue.Enqueue(awaitable);
return awaitable;
}
}
public void Advance(int bytes)
{
Debug.Assert(!IsAborted, $"({nameof(Advance)} called after abort.");
@ -55,23 +40,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
Available += bytes;
while (Available > 0 && _awaitableQueue.Count > 0)
{
var awaitable = _awaitableQueue.Dequeue();
awaitable.Complete();
}
return true;
}
public void Abort()
{
IsAborted = true;
while (_awaitableQueue.Count > 0)
{
_awaitableQueue.Dequeue().Complete();
}
}
}
}

View File

@ -0,0 +1,116 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Diagnostics;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class InputFlowControl
{
private readonly int _initialWindowSize;
private readonly int _minWindowSizeIncrement;
private FlowControl _flow;
private int _pendingUpdateSize;
private bool _windowUpdatesDisabled;
private readonly object _flowLock = new object();
public InputFlowControl(uint initialWindowSize, uint minWindowSizeIncrement)
{
Debug.Assert(initialWindowSize >= minWindowSizeIncrement, "minWindowSizeIncrement is greater than the window size.");
_flow = new FlowControl(initialWindowSize);
_initialWindowSize = (int)initialWindowSize;
_minWindowSizeIncrement = (int)minWindowSizeIncrement;
}
public bool TryAdvance(int bytes)
{
lock (_flowLock)
{
// Even if the stream is aborted, the client should never send more data than was available in the
// flow-control window at the time of the abort.
if (bytes > _flow.Available)
{
throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorFlowControlWindowExceeded, Http2ErrorCode.FLOW_CONTROL_ERROR);
}
if (_flow.IsAborted)
{
// This data won't be read by the app, so tell the caller to count the data as already consumed.
return false;
}
_flow.Advance(bytes);
return true;
}
}
public bool TryUpdateWindow(int bytes, out int updateSize)
{
lock (_flowLock)
{
updateSize = 0;
if (_flow.IsAborted)
{
// All data received by stream has already been returned to the connection window.
return false;
}
if (!_flow.TryUpdateWindow(bytes))
{
// We only try to update the window back to its initial size after the app consumes data.
// It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
Debug.Assert(false, $"{nameof(TryUpdateWindow)} attempted to grow window past max size.");
}
if (_windowUpdatesDisabled)
{
// Continue returning space to the connection window. The end of the stream has already
// been received, so don't send window updates for the stream window.
return true;
}
var potentialUpdateSize = _pendingUpdateSize + bytes;
if (potentialUpdateSize > _minWindowSizeIncrement)
{
_pendingUpdateSize = 0;
updateSize = potentialUpdateSize;
}
else
{
_pendingUpdateSize = potentialUpdateSize;
}
return true;
}
}
public void StopWindowUpdates()
{
lock (_flowLock)
{
_windowUpdatesDisabled = true;
}
}
public int Abort()
{
lock (_flowLock)
{
if (_flow.IsAborted)
{
return 0;
}
_flow.Abort();
// Tell caller to return connection window space consumed by this stream. Even if window updates have
// been disabled at the stream level, connection-level window updates may still be necessary.
return _initialWindowSize - _flow.Available;
}
}
}
}

View File

@ -0,0 +1,74 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Diagnostics;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class OutputFlowControl
{
private FlowControl _flow;
private Queue<OutputFlowControlAwaitable> _awaitableQueue;
public OutputFlowControl(uint initialWindowSize)
{
_flow = new FlowControl(initialWindowSize);
}
public int Available => _flow.Available;
public bool IsAborted => _flow.IsAborted;
public OutputFlowControlAwaitable AvailabilityAwaitable
{
get
{
Debug.Assert(!_flow.IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort.");
Debug.Assert(_flow.Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available.");
if (_awaitableQueue == null)
{
_awaitableQueue = new Queue<OutputFlowControlAwaitable>();
}
var awaitable = new OutputFlowControlAwaitable();
_awaitableQueue.Enqueue(awaitable);
return awaitable;
}
}
public void Advance(int bytes)
{
_flow.Advance(bytes);
}
// bytes can be negative when SETTINGS_INITIAL_WINDOW_SIZE decreases mid-connection.
// This can also cause Available to become negative which MUST be allowed.
// https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2
public bool TryUpdateWindow(int bytes)
{
if (_flow.TryUpdateWindow(bytes))
{
while (_flow.Available > 0 && _awaitableQueue?.Count > 0)
{
_awaitableQueue.Dequeue().Complete();
}
return true;
}
return false;
}
public void Abort()
{
// Make sure to set the aborted flag before running any continuations.
_flow.Abort();
while (_awaitableQueue?.Count > 0)
{
_awaitableQueue.Dequeue().Complete();
}
}
}
}

View File

@ -6,15 +6,15 @@ using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class Http2OutputFlowControlAwaitable : ICriticalNotifyCompletion
public class OutputFlowControlAwaitable : ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };
private Action _callback;
public Http2OutputFlowControlAwaitable GetAwaiter() => this;
public OutputFlowControlAwaitable GetAwaiter() => this;
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
public void GetResult()

View File

@ -0,0 +1,92 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Diagnostics;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class StreamInputFlowControl
{
private readonly InputFlowControl _connectionLevelFlowControl;
private readonly InputFlowControl _streamLevelFlowControl;
private readonly int _streamId;
private readonly Http2FrameWriter _frameWriter;
public StreamInputFlowControl(
int streamId,
Http2FrameWriter frameWriter,
InputFlowControl connectionLevelFlowControl,
uint initialWindowSize,
uint minWindowSizeIncrement)
{
_connectionLevelFlowControl = connectionLevelFlowControl;
_streamLevelFlowControl = new InputFlowControl(initialWindowSize, minWindowSizeIncrement);
_streamId = streamId;
_frameWriter = frameWriter;
}
public void Advance(int bytes)
{
var connectionSucess = _connectionLevelFlowControl.TryAdvance(bytes);
Debug.Assert(connectionSucess, "Connection-level input flow control should never be aborted.");
if (!_streamLevelFlowControl.TryAdvance(bytes))
{
// The stream has already been aborted, so immediately count the bytes as read at the connection level.
UpdateConnectionWindow(bytes);
}
}
public void UpdateWindows(int bytes)
{
if (!_streamLevelFlowControl.TryUpdateWindow(bytes, out var streamWindowUpdateSize))
{
// Stream-level flow control was aborted. Any unread bytes have already been returned to the connection
// flow-control window by Abort().
return;
}
if (streamWindowUpdateSize > 0)
{
// Writing with the FrameWriter should only fail if given a canceled token, so just fire and forget.
_ = _frameWriter.WriteWindowUpdateAsync(_streamId, streamWindowUpdateSize);
}
UpdateConnectionWindow(bytes);
}
public void StopWindowUpdates()
{
_streamLevelFlowControl.StopWindowUpdates();
}
public void Abort()
{
var unreadBytes = _streamLevelFlowControl.Abort();
if (unreadBytes > 0)
{
// We assume that the app won't read the remaining data from the request body pipe.
// Even if the app does continue reading, _streamLevelFlowControl.TryUpdateWindow() will return false
// from now on which prevents double counting.
UpdateConnectionWindow(unreadBytes);
}
}
private void UpdateConnectionWindow(int bytes)
{
var connectionSucess = _connectionLevelFlowControl.TryUpdateWindow(bytes, out var connectionWindowUpdateSize);
Debug.Assert(connectionSucess, "Connection-level input flow control should never be aborted.");
if (connectionWindowUpdateSize > 0)
{
// Writing with the FrameWriter should only fail if given a canceled token, so just fire and forget.
_ = _frameWriter.WriteWindowUpdateAsync(0, connectionWindowUpdateSize);
}
}
}
}

View File

@ -5,19 +5,19 @@ using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class Http2StreamOutputFlowControl
public class StreamOutputFlowControl
{
private readonly Http2OutputFlowControl _connectionLevelFlowControl;
private readonly Http2OutputFlowControl _streamLevelFlowControl;
private readonly OutputFlowControl _connectionLevelFlowControl;
private readonly OutputFlowControl _streamLevelFlowControl;
private Http2OutputFlowControlAwaitable _currentConnectionLevelAwaitable;
private OutputFlowControlAwaitable _currentConnectionLevelAwaitable;
public Http2StreamOutputFlowControl(Http2OutputFlowControl connectionLevelFlowControl, uint initialWindowSize)
public StreamOutputFlowControl(OutputFlowControl connectionLevelFlowControl, uint initialWindowSize)
{
_connectionLevelFlowControl = connectionLevelFlowControl;
_streamLevelFlowControl = new Http2OutputFlowControl(initialWindowSize);
_streamLevelFlowControl = new OutputFlowControl(initialWindowSize);
}
public int Available => Math.Min(_connectionLevelFlowControl.Available, _streamLevelFlowControl.Available);
@ -30,7 +30,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_streamLevelFlowControl.Advance(bytes);
}
public int AdvanceUpToAndWait(long bytes, out Http2OutputFlowControlAwaitable awaitable)
public int AdvanceUpToAndWait(long bytes, out OutputFlowControlAwaitable awaitable)
{
var leastAvailableFlow = _connectionLevelFlowControl.Available < _streamLevelFlowControl.Available
? _connectionLevelFlowControl : _streamLevelFlowControl;

View File

@ -15,6 +15,7 @@ using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;
using Microsoft.Net.Http.Headers;
@ -61,7 +62,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly Http2ConnectionContext _context;
private readonly Http2FrameWriter _frameWriter;
private readonly HPackDecoder _hpackDecoder;
private readonly Http2OutputFlowControl _outputFlowControl = new Http2OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize);
private readonly InputFlowControl _inputFlowControl = new InputFlowControl(Http2PeerSettings.DefaultInitialWindowSize, Http2PeerSettings.DefaultInitialWindowSize / 2);
private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(Http2PeerSettings.DefaultInitialWindowSize);
private readonly Http2PeerSettings _serverSettings = new Http2PeerSettings();
private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings();
@ -200,6 +202,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
try
{
// Ensure aborting each stream doesn't result in unnecessary WINDOW_UPDATE frames being sent.
_inputFlowControl.StopWindowUpdates();
foreach (var stream in _streams.Values)
{
stream.Abort(connectionError);
@ -367,8 +372,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
throw new Http2ConnectionErrorException(CoreStrings.FormatHttp2ErrorStreamHalfClosedRemote(_incomingFrame.Type, stream.StreamId), Http2ErrorCode.STREAM_CLOSED);
}
return stream.OnDataAsync(_incomingFrame.DataPayload,
endStream: (_incomingFrame.DataFlags & Http2DataFrameFlags.END_STREAM) == Http2DataFrameFlags.END_STREAM);
return stream.OnDataAsync(_incomingFrame);
}
// If we couldn't find the stream, it was either alive previously but closed with
@ -460,13 +464,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
StreamLifetimeHandler = this,
ClientPeerSettings = _clientSettings,
FrameWriter = _frameWriter,
ConnectionInputFlowControl = _inputFlowControl,
ConnectionOutputFlowControl = _outputFlowControl,
TimeoutControl = this,
});
if ((_incomingFrame.HeadersFlags & Http2HeadersFrameFlags.END_STREAM) == Http2HeadersFrameFlags.END_STREAM)
{
await _currentHeadersStream.OnDataAsync(Constants.EmptyData, endStream: true);
_currentHeadersStream.OnEndStreamReceived();
}
_currentHeadersStream.Reset();
@ -741,9 +746,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
if (endHeaders)
{
var endStreamTask = _currentHeadersStream.OnDataAsync(Constants.EmptyData, endStream: true);
_currentHeadersStream.OnEndStreamReceived();
ResetRequestHeaderParsingState();
return endStreamTask;
}
return Task.CompletedTask;

View File

@ -10,6 +10,7 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
@ -25,7 +26,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
private readonly PipeWriter _outputWriter;
private readonly PipeReader _outputReader;
private readonly Http2OutputFlowControl _connectionOutputFlowControl;
private readonly OutputFlowControl _connectionOutputFlowControl;
private readonly StreamSafePipeFlusher _flusher;
private bool _completed;
@ -33,7 +34,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public Http2FrameWriter(
PipeWriter outputPipeWriter,
PipeReader outputPipeReader,
Http2OutputFlowControl connectionOutputFlowControl,
OutputFlowControl connectionOutputFlowControl,
ITimeoutControl timeoutControl)
{
_outputWriter = outputPipeWriter;
@ -86,7 +87,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_outgoingFrame.Length = _continueBytes.Length;
_continueBytes.CopyTo(_outgoingFrame.HeadersPayload);
return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
return WriteFrameUnsynchronizedAsync();
}
}
@ -128,7 +129,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
public Task WriteDataAsync(int streamId, Http2StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, bool endStream)
public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, bool endStream)
{
// The Length property of a ReadOnlySequence can be expensive, so we cache the value.
var dataLength = data.Length;
@ -190,14 +191,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
_outgoingFrame.Length = unwrittenPayloadLength;
_outputWriter.Write(_outgoingFrame.Raw);
return FlushUnsynchronizedAsync();
return _flusher.FlushAsync();
}
private async Task WriteDataAsyncAwaited(int streamId, Http2StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, long dataLength, bool endStream)
private async Task WriteDataAsyncAwaited(int streamId, StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, long dataLength, bool endStream)
{
while (dataLength > 0)
{
Http2OutputFlowControlAwaitable availabilityAwaitable;
OutputFlowControlAwaitable availabilityAwaitable;
var writeTask = Task.CompletedTask;
lock (_writeLock)
@ -239,12 +240,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
await ThreadPoolAwaitable.Instance;
}
public Task WriteWindowUpdateAsync(int streamId, int sizeIncrement)
{
lock (_writeLock)
{
_outgoingFrame.PrepareWindowUpdate(streamId, sizeIncrement);
return WriteFrameUnsynchronizedAsync();
}
}
public Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode)
{
lock (_writeLock)
{
_outgoingFrame.PrepareRstStream(streamId, errorCode);
return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
return WriteFrameUnsynchronizedAsync();
}
}
@ -254,7 +264,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
// TODO: actually send settings
_outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.NONE);
return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
return WriteFrameUnsynchronizedAsync();
}
}
@ -263,7 +273,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_writeLock)
{
_outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.ACK);
return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
return WriteFrameUnsynchronizedAsync();
}
}
@ -273,7 +283,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
_outgoingFrame.PreparePing(Http2PingFrameFlags.ACK);
payload.CopyTo(_outgoingFrame.Payload);
return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
return WriteFrameUnsynchronizedAsync();
}
}
@ -282,23 +292,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_writeLock)
{
_outgoingFrame.PrepareGoAway(lastStreamId, errorCode);
return WriteUnsynchronizedAsync(_outgoingFrame.Raw);
return WriteFrameUnsynchronizedAsync();
}
}
private Task WriteUnsynchronizedAsync(ReadOnlySpan<byte> data)
private Task WriteFrameUnsynchronizedAsync()
{
if (_completed)
{
return Task.CompletedTask;
}
_outputWriter.Write(data);
return FlushUnsynchronizedAsync();
}
private Task FlushUnsynchronizedAsync()
{
_outputWriter.Write(_outgoingFrame.Raw);
return _flusher.FlushAsync();
}
@ -310,7 +315,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
public bool TryUpdateStreamWindow(Http2StreamOutputFlowControl flowControl, int bytes)
public bool TryUpdateStreamWindow(StreamOutputFlowControl flowControl, int bytes)
{
lock (_writeLock)
{
@ -318,7 +323,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
public void AbortPendingStreamDataWrites(Http2StreamOutputFlowControl flowControl)
public void AbortPendingStreamDataWrites(StreamOutputFlowControl flowControl)
{
lock (_writeLock)
{

View File

@ -6,11 +6,11 @@ using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
public abstract class Http2MessageBody : MessageBody
public class Http2MessageBody : MessageBody
{
private readonly Http2Stream _context;
protected Http2MessageBody(Http2Stream context)
private Http2MessageBody(Http2Stream context)
: base(context)
{
_context = context;
@ -25,33 +25,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
protected override void OnDataRead(int bytesRead)
{
_context.OnDataRead(bytesRead);
}
protected override Task OnConsumeAsync() => Task.CompletedTask;
public override Task StopAsync()
{
_context.RequestBodyPipe.Reader.Complete();
_context.RequestBodyPipe.Writer.Complete();
return Task.CompletedTask;
}
public override Task StopAsync() => Task.CompletedTask;
public static MessageBody For(
HttpRequestHeaders headers,
Http2Stream context)
{
if (context.EndStreamReceived)
if (context.EndStreamReceived && !context.RequestBodyStarted)
{
return ZeroContentLengthClose;
}
return new ForHttp2(context);
}
private class ForHttp2 : Http2MessageBody
{
public ForHttp2(Http2Stream context)
: base(context)
{
}
return new Http2MessageBody(context);
}
}
}

View File

@ -9,6 +9,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
@ -22,7 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
// This should only be accessed via the FrameWriter. The connection-level output flow control is protected by the
// FrameWriter's connection-level write lock.
private readonly Http2StreamOutputFlowControl _flowControl;
private readonly StreamOutputFlowControl _flowControl;
private readonly object _dataWriterLock = new object();
private readonly Pipe _dataPipe;
@ -34,7 +35,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public Http2OutputProducer(
int streamId,
Http2FrameWriter frameWriter,
Http2StreamOutputFlowControl flowControl,
StreamOutputFlowControl flowControl,
ITimeoutControl timeoutControl,
MemoryPool<byte> pool)
{

View File

@ -6,14 +6,12 @@ using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Abstractions;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.Extensions.Primitives;
using Microsoft.Net.Http.Headers;
@ -23,24 +21,35 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
{
private readonly Http2StreamContext _context;
private readonly Http2OutputProducer _http2Output;
private readonly Http2StreamOutputFlowControl _outputFlowControl;
private int _requestAborted;
private readonly StreamInputFlowControl _inputFlowControl;
private readonly StreamOutputFlowControl _outputFlowControl;
private StreamCompletionFlags _completionState;
private readonly object _completionLock = new object();
public Http2Stream(Http2StreamContext context)
: base(context)
{
_context = context;
_outputFlowControl = new Http2StreamOutputFlowControl(context.ConnectionOutputFlowControl, context.ClientPeerSettings.InitialWindowSize);
_inputFlowControl = new StreamInputFlowControl(
_context.StreamId,
_context.FrameWriter,
context.ConnectionInputFlowControl,
Http2PeerSettings.DefaultInitialWindowSize,
Http2PeerSettings.DefaultInitialWindowSize / 2);
_outputFlowControl = new StreamOutputFlowControl(context.ConnectionOutputFlowControl, context.ClientPeerSettings.InitialWindowSize);
_http2Output = new Http2OutputProducer(context.StreamId, context.FrameWriter, _outputFlowControl, context.TimeoutControl, context.MemoryPool);
RequestBodyPipe = CreateRequestBodyPipe();
Output = _http2Output;
}
public int StreamId => _context.StreamId;
public bool RequestBodyStarted { get; private set; }
public bool EndStreamReceived { get; private set; }
protected IHttp2StreamLifetimeHandler StreamLifetimeHandler => _context.StreamLifetimeHandler;
public bool EndStreamReceived => (_completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived;
public override bool IsUpgradableRequest => false;
@ -51,7 +60,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
protected override void OnRequestProcessingEnded()
{
StreamLifetimeHandler.OnStreamCompleted(StreamId);
TryApplyCompletionFlag(StreamCompletionFlags.RequestProcessingEnded);
RequestBodyPipe.Reader.Complete();
// The app can no longer read any more of the request body, so return any bytes that weren't read to the
// connection's flow-control window.
_inputFlowControl.Abort();
}
protected override string CreateRequestId()
@ -246,31 +261,61 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
public async Task OnDataAsync(ArraySegment<byte> data, bool endStream)
public Task OnDataAsync(Http2Frame dataFrame)
{
// TODO: content-length accounting
// TODO: flow-control
try
// Since padding isn't buffered, immediately count padding bytes as read for flow control purposes.
if (dataFrame.DataHasPadding)
{
if (data.Count > 0)
{
RequestBodyPipe.Writer.Write(data);
// Add 1 byte for the padding length prefix.
OnDataRead(dataFrame.DataPadLength + 1);
}
RequestBodyStarted = true;
await RequestBodyPipe.Writer.FlushAsync();
}
var payload = dataFrame.DataPayload;
var endStream = (dataFrame.DataFlags & Http2DataFrameFlags.END_STREAM) == Http2DataFrameFlags.END_STREAM;
if (payload.Count > 0)
{
RequestBodyStarted = true;
if (endStream)
{
EndStreamReceived = true;
RequestBodyPipe.Writer.Complete();
// No need to send any more window updates for this stream now that we've received all the data.
// Call before flushing the request body pipe, because that might induce a window update.
_inputFlowControl.StopWindowUpdates();
}
_inputFlowControl.Advance(payload.Count);
RequestBodyPipe.Writer.Write(payload);
var flushTask = RequestBodyPipe.Writer.FlushAsync();
// It shouldn't be possible for the RequestBodyPipe to fill up an return an incomplete task if
// _inputFlowControl.Advance() didn't throw.
Debug.Assert(flushTask.IsCompleted);
}
catch (Exception ex)
if (endStream)
{
RequestBodyPipe.Writer.Complete(ex);
OnEndStreamReceived();
}
return Task.CompletedTask;
}
public void OnEndStreamReceived()
{
TryApplyCompletionFlag(StreamCompletionFlags.EndStreamReceived);
RequestBodyPipe.Writer.Complete();
_inputFlowControl.StopWindowUpdates();
}
public void OnDataRead(int bytesRead)
{
_inputFlowControl.UpdateWindows(bytesRead);
}
public bool TryUpdateOutputWindow(int bytes)
@ -280,7 +325,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public override void Abort(ConnectionAbortedException abortReason)
{
if (Interlocked.Exchange(ref _requestAborted, 1) != 0)
if (!TryApplyCompletionFlag(StreamCompletionFlags.Aborted))
{
return;
}
@ -296,7 +341,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private void ResetAndAbort(ConnectionAbortedException abortReason, Http2ErrorCode error)
{
if (Interlocked.Exchange(ref _requestAborted, 1) != 0)
if (!TryApplyCompletionFlag(StreamCompletionFlags.Aborted))
{
return;
}
@ -315,6 +360,60 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
// Unblock the request body.
RequestBodyPipe.Writer.Complete(new IOException(CoreStrings.Http2StreamAborted, abortReason));
_inputFlowControl.Abort();
}
private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
readerScheduler: ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: Http2PeerSettings.DefaultInitialWindowSize,
resumeWriterThreshold: Http2PeerSettings.DefaultInitialWindowSize,
useSynchronizationContext: false,
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
));
private bool TryApplyCompletionFlag(StreamCompletionFlags completionState)
{
lock (_completionLock)
{
var lastCompletionState = _completionState;
_completionState |= completionState;
if (ShoulStopTrackingStream(_completionState) && !ShoulStopTrackingStream(lastCompletionState))
{
_context.StreamLifetimeHandler.OnStreamCompleted(StreamId);
}
return _completionState != lastCompletionState;
}
}
private static bool ShoulStopTrackingStream(StreamCompletionFlags completionState)
{
// This could be a single condition, but I think it reads better as two if's.
if ((completionState & StreamCompletionFlags.RequestProcessingEnded) == StreamCompletionFlags.RequestProcessingEnded)
{
if ((completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived ||
(completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted)
{
return true;
}
}
return false;
}
[Flags]
private enum StreamCompletionFlags
{
None = 0,
RequestProcessingEnded = 1,
EndStreamReceived = 2,
Aborted = 4,
}
}
}

View File

@ -5,6 +5,7 @@ using System.Buffers;
using System.Net;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
@ -21,7 +22,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public IHttp2StreamLifetimeHandler StreamLifetimeHandler { get; set; }
public Http2PeerSettings ClientPeerSettings { get; set; }
public Http2FrameWriter FrameWriter { get; set; }
public Http2OutputFlowControl ConnectionOutputFlowControl { get; set; }
public InputFlowControl ConnectionInputFlowControl { get; set; }
public OutputFlowControl ConnectionOutputFlowControl { get; set; }
public ITimeoutControl TimeoutControl { get; set; }
}
}

View File

@ -2002,6 +2002,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
internal static string FormatHttp2StreamAborted()
=> GetString("Http2StreamAborted");
/// <summary>
/// The client sent more data than what was available in the flow-control window.
/// </summary>
internal static string Http2ErrorFlowControlWindowExceeded
{
get => GetString("Http2ErrorFlowControlWindowExceeded");
}
/// <summary>
/// The client sent more data than what was available in the flow-control window.
/// </summary>
internal static string FormatHttp2ErrorFlowControlWindowExceeded()
=> GetString("Http2ErrorFlowControlWindowExceeded");
/// <summary>
/// CONNECT requests must not send :scheme or :path headers.
/// </summary>

View File

@ -100,11 +100,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
private static readonly byte[] _noData = new byte[0];
private static readonly byte[] _maxData = Encoding.ASCII.GetBytes(new string('a', Http2Frame.MinAllowedMaxFrameSize));
private readonly MemoryPool<byte> _memoryPool = KestrelMemoryPool.Create();
private readonly DuplexPipe.DuplexPipePair _pair;
private readonly TestApplicationErrorLogger _logger;
private readonly Http2ConnectionContext _connectionContext;
private readonly Http2Connection _connection;
private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings();
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
private readonly HPackDecoder _hpackDecoder;
@ -126,28 +122,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
private readonly RequestDelegate _waitForAbortFlushingApplication;
private readonly RequestDelegate _waitForAbortWithDataApplication;
private MemoryPool<byte> _memoryPool;
private DuplexPipe.DuplexPipePair _pair;
private Http2ConnectionContext _connectionContext;
private Http2Connection _connection;
private Task _connectionTask;
public Http2ConnectionTests()
{
// Always dispatch test code back to the ThreadPool. This prevents deadlocks caused by continuing
// Http2Connection.ProcessRequestsAsync() loop with writer locks acquired. Run product code inline to make
// it easier to verify request frames are processed correctly immediately after sending the them.
var inputPipeOptions = new PipeOptions(
pool: _memoryPool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.ThreadPool,
useSynchronizationContext: false
);
var outputPipeOptions = new PipeOptions(
pool: _memoryPool,
readerScheduler: PipeScheduler.ThreadPool,
writerScheduler: PipeScheduler.Inline,
useSynchronizationContext: false
);
_pair = DuplexPipe.CreateConnectionPair(inputPipeOptions, outputPipeOptions);
_noopApplication = context => Task.CompletedTask;
_readHeadersApplication = context =>
@ -297,6 +280,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_logger = new TestApplicationErrorLogger();
InitializeConnectionFields(KestrelMemoryPool.Create());
}
private void InitializeConnectionFields(MemoryPool<byte> memoryPool)
{
_memoryPool = memoryPool;
// Always dispatch test code back to the ThreadPool. This prevents deadlocks caused by continuing
// Http2Connection.ProcessRequestsAsync() loop with writer locks acquired. Run product code inline to make
// it easier to verify request frames are processed correctly immediately after sending the them.
var inputPipeOptions = new PipeOptions(
pool: _memoryPool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.ThreadPool,
useSynchronizationContext: false
);
var outputPipeOptions = new PipeOptions(
pool: _memoryPool,
readerScheduler: PipeScheduler.ThreadPool,
writerScheduler: PipeScheduler.Inline,
useSynchronizationContext: false
);
_pair = DuplexPipe.CreateConnectionPair(inputPipeOptions, outputPipeOptions);
_connectionContext = new Http2ConnectionContext
{
ConnectionFeatures = new FeatureCollection(),
@ -308,6 +316,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Application = _pair.Application,
Transport = _pair.Transport
};
_connection = new Http2Connection(_connectionContext);
}
@ -370,7 +379,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(dataFrame.DataPayload, _helloWorldBytes);
Assert.Equal(_helloWorldBytes, dataFrame.DataPayload);
}
[Fact]
@ -385,6 +394,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
var dataFrame = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
@ -396,7 +406,85 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(dataFrame.DataPayload, _maxData);
Assert.Equal(_maxData, dataFrame.DataPayload);
}
[Fact]
public async Task DATA_Received_GreaterThanDefaultInitialWindowSize_ReadByStream()
{
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
// Double the client stream windows to 128KiB so no stream WINDOW_UPDATEs need to be sent.
_clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2;
await InitializeConnectionAsync(_echoApplication);
// Double the client connection window to 128KiB.
await SendWindowUpdateAsync(0, (int)Http2PeerSettings.DefaultInitialWindowSize);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
var dataFrame1 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// Writing over half the initial window size induces both a connection-level and stream-level window update.
await SendDataAsync(1, _maxData, endStream: false);
var streamWindowUpdateFrame1 = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
var connectionWindowUpdateFrame1 = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
var dataFrame2 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendDataAsync(1, _maxData, endStream: false);
var dataFrame3 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendDataAsync(1, _maxData, endStream: true);
var connectionWindowUpdateFrame2 = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
var dataFrame4 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(_maxData, dataFrame1.DataPayload);
Assert.Equal(_maxData, dataFrame2.DataPayload);
Assert.Equal(_maxData, dataFrame3.DataPayload);
Assert.Equal(_maxData, dataFrame4.DataPayload);
Assert.Equal(_maxData.Length * 2, streamWindowUpdateFrame1.WindowUpdateSizeIncrement);
Assert.Equal(_maxData.Length * 2, connectionWindowUpdateFrame1.WindowUpdateSizeIncrement);
Assert.Equal(_maxData.Length * 2, connectionWindowUpdateFrame2.WindowUpdateSizeIncrement);
}
[Fact]
@ -428,7 +516,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(dataFrame.DataPayload, _helloWorldBytes);
Assert.Equal(_helloWorldBytes, dataFrame.DataPayload);
}
[Fact]
@ -497,6 +585,105 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.Equal(stream3DataFrame2.DataPayload, _worldBytes);
}
[Fact]
public async Task DATA_Received_Multiplexed_GreaterThanDefaultInitialWindowSize_ReadByStream()
{
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
// Double the client stream windows to 128KiB so no stream WINDOW_UPDATEs need to be sent.
_clientSettings.InitialWindowSize = Http2PeerSettings.DefaultInitialWindowSize * 2;
await InitializeConnectionAsync(_echoApplication);
// Double the client connection window to 128KiB.
await SendWindowUpdateAsync(0, (int)Http2PeerSettings.DefaultInitialWindowSize);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
var dataFrame1 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// Writing over half the initial window size induces both a connection-level and stream-level window update.
await SendDataAsync(1, _maxData, endStream: false);
var streamWindowUpdateFrame = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
var connectionWindowUpdateFrame1 = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
var dataFrame2 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await SendDataAsync(1, _maxData, endStream: false);
var dataFrame3 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// Uploading data to a new stream induces a second connection-level but not stream-level window update.
await StartStreamAsync(3, _browserRequestHeaders, endStream: false);
await SendDataAsync(3, _maxData, endStream: true);
var connectionWindowUpdateFrame2 = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 3);
var dataFrame4 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 3);
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 3);
await SendDataAsync(1, _maxData, endStream: true);
var dataFrame5 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
await StopConnectionAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false);
Assert.Equal(_maxData, dataFrame1.DataPayload);
Assert.Equal(_maxData, dataFrame2.DataPayload);
Assert.Equal(_maxData, dataFrame3.DataPayload);
Assert.Equal(_maxData.Length * 2, streamWindowUpdateFrame.WindowUpdateSizeIncrement);
Assert.Equal(_maxData.Length * 2, connectionWindowUpdateFrame1.WindowUpdateSizeIncrement);
Assert.Equal(_maxData, dataFrame4.DataPayload);
Assert.Equal(_maxData.Length * 2, connectionWindowUpdateFrame2.WindowUpdateSizeIncrement);
Assert.Equal(_maxData, dataFrame5.DataPayload);
}
[Fact]
public async Task DATA_Received_Multiplexed_AppMustNotBlockOtherFrames()
{
@ -536,7 +723,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 3);
await ExpectAsync(Http2FrameType.DATA,
await ExpectAsync(Http2FrameType.DATA,
withLength: 5,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 3);
@ -589,7 +776,91 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(dataFrame.DataPayload, _helloWorldBytes);
Assert.Equal(_helloWorldBytes, dataFrame.DataPayload);
}
[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(255)]
public async Task DATA_Received_WithPadding_CountsTowardsInputFlowControl(byte padLength)
{
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
var maxDataMinusPadding = new ArraySegment<byte>(_maxData, 0, _maxData.Length - padLength - 1);
await InitializeConnectionAsync(_echoApplication);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataWithPaddingAsync(1, maxDataMinusPadding, padLength, endStream: false);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 37,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
var dataFrame1 = await ExpectAsync(Http2FrameType.DATA,
withLength: maxDataMinusPadding.Count,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
// Writing over half the initial window size induces both a connection-level and stream-level window update.
await SendDataAsync(1, _maxData, endStream: true);
var connectionWindowUpdateFrame = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
var dataFrame2 = await ExpectAsync(Http2FrameType.DATA,
withLength: _maxData.Length,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(maxDataMinusPadding, dataFrame1.DataPayload);
Assert.Equal(_maxData, dataFrame2.DataPayload);
Assert.Equal(_maxData.Length * 2, connectionWindowUpdateFrame.WindowUpdateSizeIncrement);
}
[Fact]
public async Task DATA_Received_ButNotConsumedByApp_CountsTowardsInputFlowControl()
{
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
await InitializeConnectionAsync(_noopApplication);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await ExpectAsync(Http2FrameType.HEADERS,
withLength: 55,
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
withStreamId: 1);
await ExpectAsync(Http2FrameType.DATA,
withLength: 0,
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
withStreamId: 1);
// Writing over half the initial window size induces both a connection-level window update.
await SendDataAsync(1, _maxData, endStream: true);
var connectionWindowUpdateFrame = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(_maxData.Length * 2, connectionWindowUpdateFrame.WindowUpdateSizeIncrement);
}
[Fact]
@ -777,7 +1048,64 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task DATA_Sent_DespiteConnectionBackpressure_IfEmptyAndEndsStream()
public async Task DATA_Received_NoStreamWindowSpace_ConnectionError()
{
// I hate doing this, but it avoids exceptions from MemoryPool.Dipose() in debug mode. The problem is since
// the stream's ProcessRequestsAsync loop is never awaited by the connection, it's not really possible to
// observe when all the blocks are returned. This can be removed after we implement graceful shutdown.
Dispose();
InitializeConnectionFields(new DiagnosticMemoryPool(KestrelMemoryPool.CreateSlabMemoryPool(), allowLateReturn: true));
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
await InitializeConnectionAsync(_waitForAbortApplication);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await WaitForConnectionErrorAsync<Http2ConnectionErrorException>(
ignoreNonGoAwayFrames: false,
expectedLastStreamId: 1,
expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR,
expectedErrorMessage: CoreStrings.Http2ErrorFlowControlWindowExceeded);
}
[Fact]
public async Task DATA_Received_NoConnectionWindowSpace_ConnectionError()
{
// I hate doing this, but it avoids exceptions from MemoryPool.Dipose() in debug mode. The problem is since
// the stream's ProcessRequestsAsync loop is never awaited by the connection, it's not really possible to
// observe when all the blocks are returned. This can be removed after we implement graceful shutdown.
Dispose();
InitializeConnectionFields(new DiagnosticMemoryPool(KestrelMemoryPool.CreateSlabMemoryPool(), allowLateReturn: true));
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
await InitializeConnectionAsync(_waitForAbortApplication);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await StartStreamAsync(3, _browserRequestHeaders, endStream: false);
await SendDataAsync(3, _maxData, endStream: false);
await SendDataAsync(3, _maxData, endStream: false);
await WaitForConnectionErrorAsync<Http2ConnectionErrorException>(
ignoreNonGoAwayFrames: false,
expectedLastStreamId: 3,
expectedErrorCode: Http2ErrorCode.FLOW_CONTROL_ERROR,
expectedErrorMessage: CoreStrings.Http2ErrorFlowControlWindowExceeded);
}
[Fact]
public async Task DATA_Sent_DespiteConnectionOutputFlowControl_IfEmptyAndEndsStream()
{
// Zero-length data frames are allowed to be sent even if there is no space available in the flow control window.
// https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
@ -862,7 +1190,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task DATA_Sent_DespiteStreamBackpressure_IfEmptyAndEndsStream()
public async Task DATA_Sent_DespiteStreamOutputFlowControl_IfEmptyAndEndsStream()
{
// Zero-length data frames are allowed to be sent even if there is no space available in the flow control window.
// https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
@ -1561,7 +1889,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task RST_STREAM_Received_RelievesConnectionBackpressure()
public async Task RST_STREAM_Received_ContinuesAppsAwaitingConnectionOutputFlowControl()
{
var writeTasks = new Task[4];
@ -1681,7 +2009,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task RST_STREAM_Received_RelievesStreamBackpressure()
public async Task RST_STREAM_Received_ContinuesAppsAwaitingStreamOutputFlowControl()
{
var writeTasks = new Task[6];
var initialWindowSize = _helloWorldBytes.Length / 2;
@ -1736,7 +2064,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: streamId);
Assert.Equal(dataFrame.DataPayload, new ArraySegment<byte>(_helloWorldBytes, 0, initialWindowSize));
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, 0, initialWindowSize), dataFrame.DataPayload);
Assert.False(writeTasks[streamId].IsCompleted);
}
@ -1764,6 +2092,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.Contains(5, _abortedStreamIds);
}
[Fact]
public async Task RST_STREAM_Received_ReturnsSpaceToConnectionInputFlowControlWindow()
{
// _maxData should be 1/4th of the default initial window size + 1.
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
await InitializeConnectionAsync(_waitForAbortApplication);
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendDataAsync(1, _maxData, endStream: false);
await SendRstStreamAsync(1);
await WaitForAllStreamsAsync();
var connectionWindowUpdateFrame = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: 0);
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Contains(1, _abortedStreamIds);
Assert.Equal(_maxData.Length * 3, connectionWindowUpdateFrame.WindowUpdateSizeIncrement);
}
[Fact]
public async Task RST_STREAM_Received_StreamIdZero_ConnectionError()
{
@ -2070,7 +2425,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task GOAWAY_Received_RelievesConnectionBackpressure()
public async Task GOAWAY_Received_ContinuesAppsAwaitingConnectionOutputFlowControl()
{
var writeTasks = new Task[6];
var expectedFullFrameCountBeforeBackpressure = Http2PeerSettings.DefaultInitialWindowSize / _maxData.Length;
@ -2170,7 +2525,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
[Fact]
public async Task GOAWAY_Received_RelievesStreamBackpressure()
public async Task GOAWAY_Received_ContinuesAppsAwaitingStreamOutputFlowControle()
{
var writeTasks = new Task[6];
var initialWindowSize = _helloWorldBytes.Length / 2;
@ -2225,7 +2580,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
withFlags: (byte)Http2DataFrameFlags.NONE,
withStreamId: streamId);
Assert.Equal(dataFrame.DataPayload, new ArraySegment<byte>(_helloWorldBytes, 0, initialWindowSize));
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, 0, initialWindowSize), dataFrame.DataPayload);
Assert.False(writeTasks[streamId].IsCompleted);
}
@ -2526,8 +2881,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(dataFrame1.DataPayload, new ArraySegment<byte>(_helloWorldBytes, 0, initialWindowSize));
Assert.Equal(dataFrame2.DataPayload, new ArraySegment<byte>(_helloWorldBytes, initialWindowSize, initialWindowSize));
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, 0, initialWindowSize), dataFrame1.DataPayload);
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, initialWindowSize, initialWindowSize), dataFrame2.DataPayload);
}
[Fact]
@ -2581,9 +2936,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
Assert.Equal(dataFrame1.DataPayload, new ArraySegment<byte>(_helloWorldBytes, 0, 6));
Assert.Equal(dataFrame2.DataPayload, new ArraySegment<byte>(_helloWorldBytes, 6, 3));
Assert.Equal(dataFrame3.DataPayload, new ArraySegment<byte>(_helloWorldBytes, 9, 3));
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, 0, 6), dataFrame1.DataPayload);
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, 6, 3), dataFrame2.DataPayload);
Assert.Equal(new ArraySegment<byte>(_helloWorldBytes, 9, 3), dataFrame3.DataPayload);
}
[Fact]

View File

@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
lock (_pools)
{
var pool = new DiagnosticMemoryPool(KestrelMemoryPool.CreateSlabMemoryPool(),_allowLateReturn, _rentTracking);
var pool = new DiagnosticMemoryPool(KestrelMemoryPool.CreateSlabMemoryPool(), _allowLateReturn, _rentTracking);
_pools.Add(pool);
return pool;
}