Send window updates based on examined rather than consumed. (#8200)
This commit is contained in:
parent
16cd69f164
commit
17d072fa30
|
|
@ -14,6 +14,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
{
|
||||
private readonly Http2Stream _context;
|
||||
private ReadResult _readResult;
|
||||
private long _alreadyExaminedInNextReadResult;
|
||||
|
||||
private Http2MessageBody(Http2Stream context, MinDataRate minRequestBodyDataRate)
|
||||
: base(context, minRequestBodyDataRate)
|
||||
|
|
@ -63,14 +64,57 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
|
||||
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
|
||||
{
|
||||
var dataLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
|
||||
// This code path is fairly hard to understand so let's break it down with an example
|
||||
// ReadAsync returns a ReadResult of length 50.
|
||||
// Advance(25, 40). The examined length would be 40 and consumed length would be 25.
|
||||
// _totalExaminedInPreviousReadResult starts at 0. newlyExamined is 40.
|
||||
// OnDataRead is called with length 40.
|
||||
// _totalExaminedInPreviousReadResult is now 40 - 25 = 15.
|
||||
|
||||
// The next call to ReadAsync returns 50 again
|
||||
// Advance(5, 5) is called
|
||||
// newlyExamined is 5 - 15, or -10.
|
||||
// Update _totalExaminedInPreviousReadResult to 10 as we consumed 5.
|
||||
|
||||
// The next call to ReadAsync returns 50 again
|
||||
// _totalExaminedInPreviousReadResult is 10
|
||||
// Advance(50, 50) is called
|
||||
// newlyExamined = 50 - 10 = 40
|
||||
// _totalExaminedInPreviousReadResult is now 50
|
||||
// _totalExaminedInPreviousReadResult is finally 0 after subtracting consumedLength.
|
||||
|
||||
long examinedLength;
|
||||
long consumedLength;
|
||||
if (consumed.Equals(examined))
|
||||
{
|
||||
examinedLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, examined).Length;
|
||||
consumedLength = examinedLength;
|
||||
}
|
||||
else
|
||||
{
|
||||
consumedLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
|
||||
examinedLength = consumedLength + _readResult.Buffer.Slice(consumed, examined).Length;
|
||||
}
|
||||
|
||||
_context.RequestBodyPipe.Reader.AdvanceTo(consumed, examined);
|
||||
OnDataRead(dataLength);
|
||||
|
||||
var newlyExamined = examinedLength - _alreadyExaminedInNextReadResult;
|
||||
|
||||
if (newlyExamined > 0)
|
||||
{
|
||||
OnDataRead(newlyExamined);
|
||||
_alreadyExaminedInNextReadResult += newlyExamined;
|
||||
}
|
||||
|
||||
_alreadyExaminedInNextReadResult -= consumedLength;
|
||||
}
|
||||
|
||||
public override bool TryRead(out ReadResult readResult)
|
||||
{
|
||||
return _context.RequestBodyPipe.Reader.TryRead(out readResult);
|
||||
var result = _context.RequestBodyPipe.Reader.TryRead(out readResult);
|
||||
_readResult = readResult;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ using System.Text;
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Connections;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack;
|
||||
|
|
@ -682,6 +683,100 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
Assert.Equal(updateSize, connectionWindowUpdateFrame.WindowUpdateSizeIncrement);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DATA_BufferRequestBodyLargerThanStreamSizeSmallerThanConnectionPipe_Works()
|
||||
{
|
||||
var initialStreamWindowSize = _serviceContext.ServerOptions.Limits.Http2.InitialStreamWindowSize;
|
||||
var framesInStreamWindow = initialStreamWindowSize / Http2PeerSettings.DefaultMaxFrameSize;
|
||||
var initialConnectionWindowSize = _serviceContext.ServerOptions.Limits.Http2.InitialConnectionWindowSize;
|
||||
var framesInConnectionWindow = initialConnectionWindowSize / Http2PeerSettings.DefaultMaxFrameSize;
|
||||
|
||||
// Grow the client stream windows so no stream WINDOW_UPDATEs need to be sent.
|
||||
_clientSettings.InitialWindowSize = int.MaxValue;
|
||||
|
||||
await InitializeConnectionAsync(async context =>
|
||||
{
|
||||
await context.Response.BodyWriter.FlushAsync();
|
||||
var readResult = await context.Request.BodyReader.ReadAsync();
|
||||
while (readResult.Buffer.Length != _maxData.Length * 4)
|
||||
{
|
||||
context.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
|
||||
readResult = await context.Request.BodyReader.ReadAsync();
|
||||
}
|
||||
|
||||
context.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
|
||||
|
||||
readResult = await context.Request.BodyReader.ReadAsync();
|
||||
Assert.Equal(readResult.Buffer.Length, _maxData.Length * 5);
|
||||
|
||||
await context.Response.BodyWriter.WriteAsync(readResult.Buffer.ToArray());
|
||||
|
||||
context.Request.BodyReader.AdvanceTo(readResult.Buffer.End);
|
||||
});
|
||||
|
||||
// Grow the client connection windows so no connection WINDOW_UPDATEs need to be sent.
|
||||
await SendWindowUpdateAsync(0, int.MaxValue - (int)Http2PeerSettings.DefaultInitialWindowSize);
|
||||
|
||||
await StartStreamAsync(1, _browserRequestHeaders, endStream: false);
|
||||
|
||||
// Rounds down so we don't go over the half window size and trigger an update
|
||||
for (var i = 0; i < framesInStreamWindow / 2; i++)
|
||||
{
|
||||
await SendDataAsync(1, _maxData, endStream: false);
|
||||
}
|
||||
|
||||
// trip over the update size.
|
||||
await SendDataAsync(1, _maxData, endStream: false);
|
||||
|
||||
await ExpectAsync(Http2FrameType.HEADERS,
|
||||
withLength: 37,
|
||||
withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS,
|
||||
withStreamId: 1);
|
||||
|
||||
var dataFrames = new List<Http2FrameWithPayload>();
|
||||
|
||||
var streamWindowUpdateFrame1 = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
|
||||
withLength: 4,
|
||||
withFlags: (byte)Http2DataFrameFlags.NONE,
|
||||
withStreamId: 1);
|
||||
|
||||
// Writing over half the initial window size induces both a connection-level and stream-level window update.
|
||||
|
||||
await SendDataAsync(1, _maxData, endStream: true);
|
||||
|
||||
for (var i = 0; i < framesInStreamWindow / 2 + 2; i++)
|
||||
{
|
||||
var dataFrame3 = await ExpectAsync(Http2FrameType.DATA,
|
||||
withLength: _maxData.Length,
|
||||
withFlags: (byte)Http2DataFrameFlags.NONE,
|
||||
withStreamId: 1);
|
||||
dataFrames.Add(dataFrame3);
|
||||
}
|
||||
|
||||
var connectionWindowUpdateFrame = await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
|
||||
withLength: 4,
|
||||
withFlags: (byte)Http2DataFrameFlags.NONE,
|
||||
withStreamId: 0);
|
||||
// End
|
||||
|
||||
await ExpectAsync(Http2FrameType.DATA,
|
||||
withLength: 0,
|
||||
withFlags: (byte)Http2DataFrameFlags.END_STREAM,
|
||||
withStreamId: 1);
|
||||
|
||||
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
|
||||
|
||||
foreach (var frame in dataFrames)
|
||||
{
|
||||
Assert.True(_maxData.AsSpan().SequenceEqual(frame.PayloadSequence.ToArray()));
|
||||
}
|
||||
|
||||
var updateSize = ((framesInStreamWindow / 2) + 1) * _maxData.Length;
|
||||
Assert.Equal(updateSize, streamWindowUpdateFrame1.WindowUpdateSizeIncrement);
|
||||
updateSize = ((framesInConnectionWindow / 2) + 1) * _maxData.Length;
|
||||
Assert.Equal(updateSize, connectionWindowUpdateFrame.WindowUpdateSizeIncrement);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DATA_Received_StreamIdZero_ConnectionError()
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue