Gracefully wait for streams to complete when pipe completes
This commit is contained in:
parent
aaf0293ebc
commit
7c9e234457
|
|
@ -1131,7 +1131,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
// chunked is applied to a response payload body, the sender MUST either
|
||||
// apply chunked as the final transfer coding or terminate the message
|
||||
// by closing the connection.
|
||||
if (hasTransferEncoding &&
|
||||
if (hasTransferEncoding &&
|
||||
HttpHeaders.GetFinalTransferCoding(responseHeaders.HeaderTransferEncoding) != TransferCoding.Chunked)
|
||||
{
|
||||
_keepAlive = false;
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
private readonly object _stateLock = new object();
|
||||
private int _highestOpenedStreamId;
|
||||
private Http2ConnectionState _state = Http2ConnectionState.Open;
|
||||
private readonly TaskCompletionSource<object> _streamsCompleted = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
private readonly ConcurrentDictionary<int, Http2Stream> _streams = new ConcurrentDictionary<int, Http2Stream>();
|
||||
|
||||
|
|
@ -256,6 +257,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
_frameWriter.WriteGoAwayAsync(_highestOpenedStreamId, errorCode);
|
||||
UpdateState(Http2ConnectionState.Closed);
|
||||
}
|
||||
|
||||
if (_streams.IsEmpty)
|
||||
{
|
||||
_streamsCompleted.TrySetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure aborting each stream doesn't result in unnecessary WINDOW_UPDATE frames being sent.
|
||||
|
|
@ -266,6 +272,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
stream.Abort(connectionError);
|
||||
}
|
||||
|
||||
await _streamsCompleted.Task;
|
||||
|
||||
_frameWriter.Complete();
|
||||
}
|
||||
catch
|
||||
|
|
@ -891,13 +899,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
|
|||
{
|
||||
_streams.TryRemove(streamId, out _);
|
||||
|
||||
if (_state == Http2ConnectionState.Closing && _streams.IsEmpty)
|
||||
if (_streams.IsEmpty)
|
||||
{
|
||||
_frameWriter.WriteGoAwayAsync(_highestOpenedStreamId, Http2ErrorCode.NO_ERROR);
|
||||
UpdateState(Http2ConnectionState.Closed);
|
||||
if (_state == Http2ConnectionState.Closing)
|
||||
{
|
||||
_frameWriter.WriteGoAwayAsync(_highestOpenedStreamId, Http2ErrorCode.NO_ERROR);
|
||||
UpdateState(Http2ConnectionState.Closed);
|
||||
|
||||
// Wake up request processing loop so the connection can complete if there are no pending requests
|
||||
Input.CancelPendingRead();
|
||||
// Wake up request processing loop so the connection can complete if there are no pending requests
|
||||
Input.CancelPendingRead();
|
||||
}
|
||||
|
||||
// Complete the task waiting on all streams to finish
|
||||
_streamsCompleted.TrySetResult(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1065,12 +1065,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
[Fact]
|
||||
public async Task DATA_Received_NoStreamWindowSpace_ConnectionError()
|
||||
{
|
||||
// I hate doing this, but it avoids exceptions from MemoryPool.Dipose() in debug mode. The problem is since
|
||||
// the stream's ProcessRequestsAsync loop is never awaited by the connection, it's not really possible to
|
||||
// observe when all the blocks are returned. This can be removed after we implement graceful shutdown.
|
||||
Dispose();
|
||||
InitializeConnectionFields(new DiagnosticMemoryPool(KestrelMemoryPool.CreateSlabMemoryPool(), allowLateReturn: true));
|
||||
|
||||
// _maxData should be 1/4th of the default initial window size + 1.
|
||||
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
|
||||
|
||||
|
|
@ -1093,12 +1087,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
[Fact]
|
||||
public async Task DATA_Received_NoConnectionWindowSpace_ConnectionError()
|
||||
{
|
||||
// I hate doing this, but it avoids exceptions from MemoryPool.Dipose() in debug mode. The problem is since
|
||||
// the stream's ProcessRequestsAsync loop is never awaited by the connection, it's not really possible to
|
||||
// observe when all the blocks are returned. This can be removed after we implement graceful shutdown.
|
||||
Dispose();
|
||||
InitializeConnectionFields(new DiagnosticMemoryPool(KestrelMemoryPool.CreateSlabMemoryPool(), allowLateReturn: true));
|
||||
|
||||
// _maxData should be 1/4th of the default initial window size + 1.
|
||||
Assert.Equal(Http2PeerSettings.DefaultInitialWindowSize + 1, (uint)_maxData.Length * 4);
|
||||
|
||||
|
|
@ -3287,7 +3275,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
_pair.Application.Output.Complete(new ConnectionResetException(string.Empty));
|
||||
|
||||
var result = await _pair.Application.Input.ReadAsync();
|
||||
Assert.True(result.IsCompleted);
|
||||
Assert.False(result.IsCompleted);
|
||||
Assert.Single(_logger.Messages, m => m.Exception is ConnectionResetException);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,9 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.Http2
|
|||
[MemberData(nameof(H2SpecTestCases))]
|
||||
public async Task RunIndividualTestCase(H2SpecTestCase testCase)
|
||||
{
|
||||
var memoryPoolFactory = new DiagnosticMemoryPoolFactory(allowLateReturn: true);
|
||||
|
||||
var hostBuilder = TransportSelector.GetWebHostBuilder(memoryPoolFactory.Create)
|
||||
var hostBuilder = TransportSelector.GetWebHostBuilder()
|
||||
.UseKestrel(options =>
|
||||
{
|
||||
options.Listen(IPAddress.Loopback, 0, listenOptions =>
|
||||
|
|
@ -66,7 +64,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.Http2
|
|||
{
|
||||
skip = "https://github.com/aspnet/KestrelHttpServer/issues/2154";
|
||||
}
|
||||
|
||||
|
||||
dataset.Add(new H2SpecTestCase()
|
||||
{
|
||||
Id = testcase.Item1,
|
||||
|
|
@ -74,7 +72,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests.Http2
|
|||
Https = false,
|
||||
Skip = skip,
|
||||
});
|
||||
|
||||
|
||||
dataset.Add(new H2SpecTestCase()
|
||||
{
|
||||
Id = testcase.Item1,
|
||||
|
|
|
|||
Loading…
Reference in New Issue