Drain rejected h2 streams #17484 (#17917)

This commit is contained in:
Chris Ross 2019-12-17 19:30:51 -08:00 committed by GitHub
parent 3c0308fa6e
commit a386abcb2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 227 additions and 38 deletions

View File

@ -894,41 +894,57 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private void StartStream()
{
if (!_isMethodConnect && (_parsedPseudoHeaderFields & _mandatoryRequestPseudoHeaderFields) != _mandatoryRequestPseudoHeaderFields)
{
// All HTTP/2 requests MUST include exactly one valid value for the :method, :scheme, and :path pseudo-header
// fields, unless it is a CONNECT request (Section 8.3). An HTTP request that omits mandatory pseudo-header
// fields is malformed (Section 8.1.2.6).
throw new Http2StreamErrorException(_currentHeadersStream.StreamId, CoreStrings.Http2ErrorMissingMandatoryPseudoHeaderFields, Http2ErrorCode.PROTOCOL_ERROR);
}
if (_clientActiveStreamCount >= _serverSettings.MaxConcurrentStreams)
{
throw new Http2StreamErrorException(_currentHeadersStream.StreamId, CoreStrings.Http2ErrorMaxStreams, Http2ErrorCode.REFUSED_STREAM);
}
// We don't use the _serverActiveRequestCount here as during shutdown, it and the dictionary
// counts get out of sync during shutdown. The streams still exist in the dictionary until the client responds with a RST or END_STREAM.
// Also, we care about the dictionary size for too much memory consumption.
if (_streams.Count >= _serverSettings.MaxConcurrentStreams * 2)
{
// Server is getting hit hard with connection resets.
// Tell client to calm down.
// TODO consider making when to send ENHANCE_YOUR_CALM configurable?
throw new Http2StreamErrorException(_currentHeadersStream.StreamId, CoreStrings.Http2TellClientToCalmDown, Http2ErrorCode.ENHANCE_YOUR_CALM);
}
// This must be initialized before we offload the request or else we may start processing request body frames without it.
_currentHeadersStream.InputRemaining = _currentHeadersStream.RequestHeaders.ContentLength;
// This must wait until we've received all of the headers so we can verify the content-length.
if ((_headerFlags & Http2HeadersFrameFlags.END_STREAM) == Http2HeadersFrameFlags.END_STREAM)
{
_currentHeadersStream.OnEndStreamReceived();
}
// The stream now exists and must be tracked and drained even if Http2StreamErrorException is thrown before dispatching to the application.
_streams[_incomingFrame.StreamId] = _currentHeadersStream;
IncrementActiveClientStreamCount();
_serverActiveStreamCount++;
try
{
// This must be initialized before we offload the request or else we may start processing request body frames without it.
_currentHeadersStream.InputRemaining = _currentHeadersStream.RequestHeaders.ContentLength;
// This must wait until we've received all of the headers so we can verify the content-length.
// We also must set the proper EndStream state before rejecting the request for any reason.
if ((_headerFlags & Http2HeadersFrameFlags.END_STREAM) == Http2HeadersFrameFlags.END_STREAM)
{
_currentHeadersStream.OnEndStreamReceived();
}
if (!_isMethodConnect && (_parsedPseudoHeaderFields & _mandatoryRequestPseudoHeaderFields) != _mandatoryRequestPseudoHeaderFields)
{
// All HTTP/2 requests MUST include exactly one valid value for the :method, :scheme, and :path pseudo-header
// fields, unless it is a CONNECT request (Section 8.3). An HTTP request that omits mandatory pseudo-header
// fields is malformed (Section 8.1.2.6).
throw new Http2StreamErrorException(_currentHeadersStream.StreamId, CoreStrings.Http2ErrorMissingMandatoryPseudoHeaderFields, Http2ErrorCode.PROTOCOL_ERROR);
}
if (_clientActiveStreamCount > _serverSettings.MaxConcurrentStreams)
{
// The protocol default stream limit is infinite so the client can excede our limit at the start of the connection.
// Refused streams can be retried, by which time the client must have received our settings frame with our limit information.
throw new Http2StreamErrorException(_currentHeadersStream.StreamId, CoreStrings.Http2ErrorMaxStreams, Http2ErrorCode.REFUSED_STREAM);
}
// We don't use the _serverActiveRequestCount here as during shutdown, it and the dictionary counts get out of sync.
// The streams still exist in the dictionary until the client responds with a RST or END_STREAM.
// Also, we care about the dictionary size for too much memory consumption.
if (_streams.Count > _serverSettings.MaxConcurrentStreams * 2)
{
// Server is getting hit hard with connection resets.
// Tell client to calm down.
// TODO consider making when to send ENHANCE_YOUR_CALM configurable?
throw new Http2StreamErrorException(_currentHeadersStream.StreamId, CoreStrings.Http2TellClientToCalmDown, Http2ErrorCode.ENHANCE_YOUR_CALM);
}
}
catch (Http2StreamErrorException)
{
MakeSpaceInDrainQueue();
// Tracked for draining
_completedStreams.Enqueue(_currentHeadersStream);
throw;
}
// Must not allow app code to block the connection handling loop.
ThreadPool.UnsafeQueueUserWorkItem(_currentHeadersStream, preferLocal: false);
}
@ -1025,6 +1041,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
}
}
// Compare to UpdateCompletedStreams, but only removes streams if over the max stream drain limit.
private void MakeSpaceInDrainQueue()
{
var maxStreams = _serverSettings.MaxConcurrentStreams * 2;
// If we're tracking too many streams, discard the oldest.
while (_streams.Count >= maxStreams && _completedStreams.TryDequeue(out var stream))
{
if (stream.DrainExpirationTicks == default)
{
_serverActiveStreamCount--;
}
_streams.Remove(stream.StreamId);
}
}
private void UpdateConnectionState()
{
if (_isClosed != 0)

View File

@ -955,7 +955,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public async Task Frame_MultipleStreams_RequestsNotFinished_EnhanceYourCalm()
{
_serviceContext.ServerOptions.Limits.Http2.MaxStreamsPerConnection = 1;
var tcs = new TaskCompletionSource<object>();
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await InitializeConnectionAsync(async context =>
{
await tcs.Task.DefaultTimeout();
@ -1719,14 +1719,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
await InitializeConnectionAsync(_noopApplication);
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, headers);
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS | Http2HeadersFrameFlags.END_STREAM, headers);
await WaitForStreamErrorAsync(
expectedStreamId: 1,
expectedErrorCode: Http2ErrorCode.PROTOCOL_ERROR,
expectedErrorMessage: CoreStrings.Http2ErrorMissingMandatoryPseudoHeaderFields);
// Verify that the stream ID can't be re-used
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, _browserRequestHeaders);
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS | Http2HeadersFrameFlags.END_STREAM, _browserRequestHeaders);
await WaitForConnectionErrorAsync<Http2ConnectionErrorException>(
ignoreNonGoAwayFrames: false,
expectedLastStreamId: 1,
@ -3458,7 +3458,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
await InitializeConnectionAsync(_noopApplication);
Assert.True(await SendHeadersAsync(1, Http2HeadersFrameFlags.NONE, headers));
Assert.True(await SendHeadersAsync(1, Http2HeadersFrameFlags.END_STREAM, headers));
await SendEmptyContinuationFrameAsync(1, Http2ContinuationFrameFlags.END_HEADERS);
await WaitForStreamErrorAsync(
@ -3467,7 +3467,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
expectedErrorMessage: CoreStrings.Http2ErrorMissingMandatoryPseudoHeaderFields);
// Verify that the stream ID can't be re-used
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS, headers);
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_HEADERS | Http2HeadersFrameFlags.END_STREAM, headers);
await WaitForConnectionErrorAsync<Http2ConnectionErrorException>(
ignoreNonGoAwayFrames: false,
expectedLastStreamId: 1,
@ -3965,6 +3965,160 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
}
[Theory]
[InlineData((int)(Http2FrameType.DATA))]
[InlineData((int)(Http2FrameType.WINDOW_UPDATE))]
[InlineData((int)(Http2FrameType.HEADERS))]
[InlineData((int)(Http2FrameType.CONTINUATION))]
public async Task RefusedStream_Post_ResetsAndDrainsRequest(int intFinalFrameType)
{
var finalFrameType = (Http2FrameType)intFinalFrameType;
CreateConnection();
_connection.ServerSettings.MaxConcurrentStreams = 0; // Refuse all streams
var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(_noopApplication));
async Task CompletePipeOnTaskCompletion()
{
try
{
await connectionTask;
}
finally
{
_pair.Transport.Input.Complete();
_pair.Transport.Output.Complete();
}
}
_connectionTask = CompletePipeOnTaskCompletion();
await SendPreambleAsync().ConfigureAwait(false);
await SendSettingsAsync();
// Requests can be sent before receiving and acking settings.
var headers = new[]
{
new KeyValuePair<string, string>(HeaderNames.Method, "POST"),
new KeyValuePair<string, string>(HeaderNames.Path, "/"),
new KeyValuePair<string, string>(HeaderNames.Scheme, "http"),
};
await StartStreamAsync(1, headers, endStream: false);
await ExpectAsync(Http2FrameType.SETTINGS,
withLength: 3 * Http2FrameReader.SettingSize,
withFlags: 0,
withStreamId: 0);
await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: 0,
withStreamId: 0);
await ExpectAsync(Http2FrameType.SETTINGS,
withLength: 0,
withFlags: (byte)Http2SettingsFrameFlags.ACK,
withStreamId: 0);
await WaitForStreamErrorAsync(1, Http2ErrorCode.REFUSED_STREAM, "HTTP/2 stream ID 1 error (REFUSED_STREAM): A new stream was refused because this connection has reached its stream limit.");
// These frames should be drained and ignored while in cool-down mode.
switch (finalFrameType)
{
case Http2FrameType.DATA:
await SendDataAsync(1, new byte[100], endStream: true);
break;
case Http2FrameType.WINDOW_UPDATE:
await SendWindowUpdateAsync(1, 1024);
break;
case Http2FrameType.HEADERS:
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_STREAM | Http2HeadersFrameFlags.END_HEADERS, _requestTrailers);
break;
case Http2FrameType.CONTINUATION:
await SendHeadersAsync(1, Http2HeadersFrameFlags.END_STREAM, _requestTrailers);
await SendContinuationAsync(1, Http2ContinuationFrameFlags.END_HEADERS, _requestTrailers);
break;
default:
throw new NotImplementedException(finalFrameType.ToString());
}
await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false);
}
[Fact]
public async Task RefusedStream_Post_2xLimitRefused()
{
var requestBlock = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
CreateConnection();
_connection.ServerSettings.MaxConcurrentStreams = 1;
var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(_ => requestBlock.Task));
async Task CompletePipeOnTaskCompletion()
{
try
{
await connectionTask;
}
finally
{
_pair.Transport.Input.Complete();
_pair.Transport.Output.Complete();
}
}
_connectionTask = CompletePipeOnTaskCompletion();
await SendPreambleAsync().ConfigureAwait(false);
await SendSettingsAsync();
// Requests can be sent before receiving and acking settings.
var headers = new[]
{
new KeyValuePair<string, string>(HeaderNames.Method, "POST"),
new KeyValuePair<string, string>(HeaderNames.Path, "/"),
new KeyValuePair<string, string>(HeaderNames.Scheme, "http"),
};
// This mimics gRPC, sending headers and data close together before receiving a reset.
await StartStreamAsync(1, headers, endStream: false);
await SendDataAsync(1, new byte[100], endStream: false);
await StartStreamAsync(3, headers, endStream: false);
await SendDataAsync(3, new byte[100], endStream: false);
await StartStreamAsync(5, headers, endStream: false);
await SendDataAsync(5, new byte[100], endStream: false);
await StartStreamAsync(7, headers, endStream: false);
await SendDataAsync(7, new byte[100], endStream: false);
await ExpectAsync(Http2FrameType.SETTINGS,
withLength: 3 * Http2FrameReader.SettingSize,
withFlags: 0,
withStreamId: 0);
await ExpectAsync(Http2FrameType.WINDOW_UPDATE,
withLength: 4,
withFlags: 0,
withStreamId: 0);
await ExpectAsync(Http2FrameType.SETTINGS,
withLength: 0,
withFlags: (byte)Http2SettingsFrameFlags.ACK,
withStreamId: 0);
await WaitForStreamErrorAsync(3, Http2ErrorCode.REFUSED_STREAM, "HTTP/2 stream ID 3 error (REFUSED_STREAM): A new stream was refused because this connection has reached its stream limit.");
await WaitForStreamErrorAsync(5, Http2ErrorCode.REFUSED_STREAM, "HTTP/2 stream ID 5 error (REFUSED_STREAM): A new stream was refused because this connection has reached its stream limit.");
await WaitForStreamErrorAsync(7, Http2ErrorCode.REFUSED_STREAM, "HTTP/2 stream ID 7 error (REFUSED_STREAM): A new stream was refused because this connection has reached its stream limit.");
requestBlock.SetResult(0);
await StopConnectionAsync(expectedLastStreamId: 7, ignoreNonGoAwayFrames: true);
}
[Theory]
[InlineData((int)(Http2FrameType.DATA))]
[InlineData((int)(Http2FrameType.HEADERS))]

View File

@ -463,6 +463,8 @@ namespace Interop.FunctionalTests
{
}
public Task SendStarted => _sendStarted.Task;
public async Task SendAsync(string text)
{
await _sendStarted.Task;
@ -845,6 +847,7 @@ namespace Interop.FunctionalTests
var request = CreateRequestMessage(HttpMethod.Post, url, streamingContent);
var requestTask = client.SendAsync(request);
await requestReceived.Task.DefaultTimeout();
await streamingContent.SendStarted.DefaultTimeout();
streamingContent.Abort();
await serverResult.Task.DefaultTimeout();
await Assert.ThrowsAnyAsync<Exception>(() => requestTask).DefaultTimeout();
@ -1246,7 +1249,7 @@ namespace Interop.FunctionalTests
await host.StopAsync().DefaultTimeout();
}
[Theory(Skip = "https://github.com/aspnet/AspNetCore/issues/17484")]
[Theory]
[MemberData(nameof(SupportedSchemes))]
public async Task Settings_MaxConcurrentStreamsPost_Server(string scheme)
{