Keep Kestrel's connection PipeReader in a consistent state (#16725)
- When the request body PipeReader.ReadAsync throws, the connection-level pipe should be advanced, so subsequent attempts to read from the connection-level pipe don't fail unnecessarily
This commit is contained in:
parent
da20a12086
commit
e3b971a75e
|
|
@ -163,7 +163,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
}
|
||||
|
||||
// Read() will have already have greedily consumed the entire request body if able.
|
||||
CheckCompletedReadResult(result);
|
||||
if (result.IsCompleted)
|
||||
{
|
||||
ThrowUnexpectedEndOfRequestContent();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
if (_readCompleted)
|
||||
{
|
||||
_isReading = true;
|
||||
return _readResult;
|
||||
return new ReadResult(_readResult.Buffer, Interlocked.Exchange(ref _userCanceled, 0) == 1, _readResult.IsCompleted);
|
||||
}
|
||||
|
||||
TryStart();
|
||||
|
|
@ -70,44 +70,47 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
}
|
||||
catch (ConnectionAbortedException ex)
|
||||
{
|
||||
_isReading = false;
|
||||
throw new TaskCanceledException("The request was aborted", ex);
|
||||
}
|
||||
|
||||
void ResetReadingState()
|
||||
{
|
||||
_isReading = false;
|
||||
// Reset the timing read here for the next call to read.
|
||||
StopTimingRead(0);
|
||||
_context.Input.AdvanceTo(_readResult.Buffer.Start);
|
||||
}
|
||||
|
||||
if (_context.RequestTimedOut)
|
||||
{
|
||||
ResetReadingState();
|
||||
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
|
||||
}
|
||||
|
||||
// Make sure to handle when this is canceled here.
|
||||
if (_readResult.IsCanceled)
|
||||
if (_readResult.IsCompleted)
|
||||
{
|
||||
if (Interlocked.Exchange(ref _userCanceled, 0) == 1)
|
||||
{
|
||||
// Ignore the readResult if it wasn't by the user.
|
||||
CreateReadResultFromConnectionReadResult();
|
||||
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Reset the timing read here for the next call to read.
|
||||
StopTimingRead(0);
|
||||
continue;
|
||||
}
|
||||
ResetReadingState();
|
||||
ThrowUnexpectedEndOfRequestContent();
|
||||
}
|
||||
|
||||
var readableBuffer = _readResult.Buffer;
|
||||
var readableBufferLength = readableBuffer.Length;
|
||||
StopTimingRead(readableBufferLength);
|
||||
|
||||
CheckCompletedReadResult(_readResult);
|
||||
|
||||
if (readableBufferLength > 0)
|
||||
// Ignore the canceled readResult if it wasn't canceled by the user.
|
||||
if (!_readResult.IsCanceled || Interlocked.Exchange(ref _userCanceled, 0) == 1)
|
||||
{
|
||||
CreateReadResultFromConnectionReadResult();
|
||||
var returnedReadResultLength = CreateReadResultFromConnectionReadResult();
|
||||
|
||||
// Don't count bytes belonging to the next request, since read rate timeouts are done on a per-request basis.
|
||||
StopTimingRead(returnedReadResultLength);
|
||||
|
||||
if (_readResult.IsCompleted)
|
||||
{
|
||||
TryStop();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
ResetReadingState();
|
||||
}
|
||||
|
||||
return _readResult;
|
||||
|
|
@ -129,66 +132,69 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
if (_readCompleted)
|
||||
{
|
||||
_isReading = true;
|
||||
readResult = _readResult;
|
||||
readResult = new ReadResult(_readResult.Buffer, Interlocked.Exchange(ref _userCanceled, 0) == 1, _readResult.IsCompleted);
|
||||
return true;
|
||||
}
|
||||
|
||||
TryStart();
|
||||
|
||||
if (!_context.Input.TryRead(out _readResult))
|
||||
// The while(true) because we don't want to return a canceled ReadResult if the user themselves didn't cancel it.
|
||||
while (true)
|
||||
{
|
||||
readResult = default;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_readResult.IsCanceled)
|
||||
{
|
||||
if (Interlocked.Exchange(ref _userCanceled, 0) == 0)
|
||||
if (!_context.Input.TryRead(out _readResult))
|
||||
{
|
||||
// Cancellation wasn't by the user, return default ReadResult
|
||||
readResult = default;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Only set _isReading if we are returing true.
|
||||
_isReading = true;
|
||||
if (!_readResult.IsCanceled || Interlocked.Exchange(ref _userCanceled, 0) == 1)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
CreateReadResultFromConnectionReadResult();
|
||||
|
||||
readResult = _readResult;
|
||||
CountBytesRead(readResult.Buffer.Length);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public override Task ConsumeAsync()
|
||||
{
|
||||
TryStart();
|
||||
|
||||
if (!_readResult.Buffer.IsEmpty && _inputLength == 0)
|
||||
{
|
||||
_context.Input.AdvanceTo(_readResult.Buffer.End);
|
||||
}
|
||||
|
||||
return OnConsumeAsync();
|
||||
}
|
||||
|
||||
private void CreateReadResultFromConnectionReadResult()
|
||||
{
|
||||
if (_readResult.Buffer.Length >= _inputLength + _examinedUnconsumedBytes)
|
||||
{
|
||||
_readCompleted = true;
|
||||
_readResult = new ReadResult(
|
||||
_readResult.Buffer.Slice(0, _inputLength + _examinedUnconsumedBytes),
|
||||
_readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 1,
|
||||
_readCompleted);
|
||||
_context.Input.AdvanceTo(_readResult.Buffer.Start);
|
||||
}
|
||||
|
||||
if (_readResult.IsCompleted)
|
||||
{
|
||||
_context.Input.AdvanceTo(_readResult.Buffer.Start);
|
||||
ThrowUnexpectedEndOfRequestContent();
|
||||
}
|
||||
|
||||
var returnedReadResultLength = CreateReadResultFromConnectionReadResult();
|
||||
|
||||
// Don't count bytes belonging to the next request, since read rate timeouts are done on a per-request basis.
|
||||
CountBytesRead(returnedReadResultLength);
|
||||
|
||||
// Only set _isReading if we are returning true.
|
||||
_isReading = true;
|
||||
readResult = _readResult;
|
||||
|
||||
if (readResult.IsCompleted)
|
||||
{
|
||||
TryStop();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private long CreateReadResultFromConnectionReadResult()
|
||||
{
|
||||
var initialLength = _readResult.Buffer.Length;
|
||||
var maxLength = _inputLength + _examinedUnconsumedBytes;
|
||||
|
||||
if (initialLength < maxLength)
|
||||
{
|
||||
return initialLength;
|
||||
}
|
||||
|
||||
_readCompleted = true;
|
||||
_readResult = new ReadResult(
|
||||
_readResult.Buffer.Slice(0, maxLength),
|
||||
_readResult.IsCanceled,
|
||||
isCompleted: true);
|
||||
|
||||
return maxLength;
|
||||
}
|
||||
|
||||
public override void AdvanceTo(SequencePosition consumed)
|
||||
|
|
@ -207,9 +213,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
if (_readCompleted)
|
||||
{
|
||||
_readResult = new ReadResult(_readResult.Buffer.Slice(consumed, _readResult.Buffer.End), Interlocked.Exchange(ref _userCanceled, 0) == 1, _readCompleted);
|
||||
// If the old stored _readResult was canceled, it's already been observed. Do not store a canceled read result permanently.
|
||||
_readResult = new ReadResult(_readResult.Buffer.Slice(consumed, _readResult.Buffer.End), isCanceled: false, _readCompleted);
|
||||
|
||||
if (_readResult.Buffer.Length == 0 && !_finalAdvanceCalled)
|
||||
if (!_finalAdvanceCalled && _readResult.Buffer.Length == 0)
|
||||
{
|
||||
_context.Input.AdvanceTo(consumed);
|
||||
_finalAdvanceCalled = true;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
|
@ -21,19 +22,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
_context = context;
|
||||
}
|
||||
|
||||
protected void CheckCompletedReadResult(ReadResult result)
|
||||
[StackTraceHidden]
|
||||
protected void ThrowUnexpectedEndOfRequestContent()
|
||||
{
|
||||
if (result.IsCompleted)
|
||||
{
|
||||
// OnInputOrOutputCompleted() is an idempotent method that closes the connection. Sometimes
|
||||
// input completion is observed here before the Input.OnWriterCompleted() callback is fired,
|
||||
// so we call OnInputOrOutputCompleted() now to prevent a race in our tests where a 400
|
||||
// response is written after observing the unexpected end of request content instead of just
|
||||
// closing the connection without a response as expected.
|
||||
_context.OnInputOrOutputCompleted();
|
||||
// OnInputOrOutputCompleted() is an idempotent method that closes the connection. Sometimes
|
||||
// input completion is observed here before the Input.OnWriterCompleted() callback is fired,
|
||||
// so we call OnInputOrOutputCompleted() now to prevent a race in our tests where a 400
|
||||
// response is written after observing the unexpected end of request content instead of just
|
||||
// closing the connection without a response as expected.
|
||||
_context.OnInputOrOutputCompleted();
|
||||
|
||||
BadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
|
||||
}
|
||||
BadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
|
||||
}
|
||||
|
||||
public abstract bool TryReadInternal(out ReadResult readResult);
|
||||
|
|
|
|||
|
|
@ -25,19 +25,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_completed)
|
||||
{
|
||||
throw new InvalidOperationException("Reading is not allowed after the reader was completed.");
|
||||
}
|
||||
ThrowIfCompleted();
|
||||
return _context.Input.ReadAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override bool TryRead(out ReadResult result)
|
||||
{
|
||||
if (_completed)
|
||||
{
|
||||
throw new InvalidOperationException("Reading is not allowed after the reader was completed.");
|
||||
}
|
||||
ThrowIfCompleted();
|
||||
return _context.Input.TryRead(out result);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1189,6 +1189,56 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnexpectedEndOfRequestContentIsRepeatedlyThrownForContentLengthBody()
|
||||
{
|
||||
using (var input = new TestInput())
|
||||
{
|
||||
var body = Http1MessageBody.For(HttpVersion.Http11, new HttpRequestHeaders { HeaderContentLength = "5" }, input.Http1Connection);
|
||||
var reader = new HttpRequestPipeReader();
|
||||
reader.StartAcceptingReads(body);
|
||||
|
||||
input.Application.Output.Complete();
|
||||
|
||||
var ex0 = Assert.Throws<BadHttpRequestException>(() => reader.TryRead(out var readResult));
|
||||
var ex1 = Assert.Throws<BadHttpRequestException>(() => reader.TryRead(out var readResult));
|
||||
var ex2 = await Assert.ThrowsAsync<BadHttpRequestException>(() => reader.ReadAsync().AsTask());
|
||||
var ex3 = await Assert.ThrowsAsync<BadHttpRequestException>(() => reader.ReadAsync().AsTask());
|
||||
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex0.Reason);
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex1.Reason);
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex2.Reason);
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex3.Reason);
|
||||
|
||||
await body.StopAsync();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnexpectedEndOfRequestContentIsRepeatedlyThrownForChunkedBody()
|
||||
{
|
||||
using (var input = new TestInput())
|
||||
{
|
||||
var body = Http1MessageBody.For(HttpVersion.Http11, new HttpRequestHeaders { HeaderTransferEncoding = "chunked" }, input.Http1Connection);
|
||||
var reader = new HttpRequestPipeReader();
|
||||
reader.StartAcceptingReads(body);
|
||||
|
||||
input.Application.Output.Complete();
|
||||
|
||||
var ex0 = Assert.Throws<BadHttpRequestException>(() => reader.TryRead(out var readResult));
|
||||
var ex1 = Assert.Throws<BadHttpRequestException>(() => reader.TryRead(out var readResult));
|
||||
var ex2 = await Assert.ThrowsAsync<BadHttpRequestException>(() => reader.ReadAsync().AsTask());
|
||||
var ex3 = await Assert.ThrowsAsync<BadHttpRequestException>(() => reader.ReadAsync().AsTask());
|
||||
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex0.Reason);
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex1.Reason);
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex2.Reason);
|
||||
Assert.Equal(RequestRejectionReason.UnexpectedEndOfRequestContent, ex3.Reason);
|
||||
|
||||
await body.StopAsync();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteForChunkedAllowsConsumeToWork()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -19,11 +19,12 @@ using Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.TestTransport
|
|||
using Microsoft.AspNetCore.Testing;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Testing;
|
||||
using Serilog;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
||||
{
|
||||
public class RequestTests : LoggedTest
|
||||
public class RequestTests : TestApplicationErrorLoggerLoggedTest
|
||||
{
|
||||
[Fact]
|
||||
public async Task StreamsAreNotPersistedAcrossRequests()
|
||||
|
|
@ -1440,6 +1441,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ContentLengthSwallowedUnexpectedEndOfRequestContentDoesNotResultInWarnings()
|
||||
{
|
||||
var testContext = new TestServiceContext(LoggerFactory);
|
||||
|
||||
await using (var server = new TestServer(async httpContext =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await httpContext.Request.Body.ReadAsync(new byte[1], 0, 1);
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
}, testContext))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
await connection.Send(
|
||||
"POST / HTTP/1.1",
|
||||
"Host:",
|
||||
"Content-Length: 5",
|
||||
"",
|
||||
"");
|
||||
connection.ShutdownSend();
|
||||
|
||||
await connection.ReceiveEnd();
|
||||
}
|
||||
}
|
||||
|
||||
Assert.Empty(TestApplicationErrorLogger.Messages.Where(m => m.LogLevel >= LogLevel.Warning));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ContentLengthRequestCallCancelPendingReadWorks()
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue