Prevent WebSockets from throwing during graceful shutdown (#27123)

- Fix canceled ReadResult handling in Http1UpgradeMessageBody
- Add UpgradeTests.DoesNotThrowGivenCanceledReadResult
This commit is contained in:
Stephen Halter 2020-10-23 11:05:58 -07:00 committed by GitHub
parent 074069f9c9
commit c15cd69b65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 4 deletions

View File

@ -14,6 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
/// </summary>
internal sealed class Http1UpgradeMessageBody : Http1MessageBody
{
private int _userCanceled;
public Http1UpgradeMessageBody(Http1Connection context, bool keepAlive)
: base(context, keepAlive)
{
@ -26,13 +28,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfCompleted();
return _context.Input.ReadAsync(cancellationToken);
return ReadAsyncInternal(cancellationToken);
}
public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();
return _context.Input.TryRead(out result);
return TryReadInternal(out result);
}
public override void AdvanceTo(SequencePosition consumed)
@ -54,6 +56,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public override void CancelPendingRead()
{
Interlocked.Exchange(ref _userCanceled, 1);
_context.Input.CancelPendingRead();
}
@ -69,12 +72,49 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public override bool TryReadInternal(out ReadResult readResult)
{
return _context.Input.TryRead(out readResult);
// Ignore the canceled readResult unless it was canceled by the user.
do
{
if (!_context.Input.TryRead(out readResult))
{
return false;
}
} while (readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 0);
return true;
}
public override ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default)
{
return _context.Input.ReadAsync(cancellationToken);
ReadResult readResult;
// Ignore the canceled readResult unless it was canceled by the user.
do
{
var readTask = _context.Input.ReadAsync(cancellationToken);
if (!readTask.IsCompletedSuccessfully)
{
return ReadAsyncInternalAwaited(readTask, cancellationToken);
}
readResult = readTask.GetAwaiter().GetResult();
} while (readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 0);
return new ValueTask<ReadResult>(readResult);
}
private async ValueTask<ReadResult> ReadAsyncInternalAwaited(ValueTask<ReadResult> readTask, CancellationToken cancellationToken = default)
{
var readResult = await readTask;
// Ignore the canceled readResult unless it was canceled by the user.
while (readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 0)
{
readResult = await _context.Input.ReadAsync(cancellationToken);
}
return readResult;
}
}
}

View File

@ -3,7 +3,10 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
@ -375,5 +378,50 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
}
[Fact]
public async Task DoesNotThrowGivenCanceledReadResult()
{
var appCompletedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
await using var server = new TestServer(async context =>
{
try
{
var upgradeFeature = context.Features.Get<IHttpUpgradeFeature>();
var duplexStream = await upgradeFeature.UpgradeAsync();
// Kestrel will call Transport.Input.CancelPendingRead() during shutdown so idle connections
// can wake up and shutdown gracefully. We manually call CancelPendingRead() to simulate this and
// ensure the Stream returned by UpgradeAsync doesn't throw in this case.
// https://github.com/dotnet/aspnetcore/issues/26482
var connectionTransportFeature = context.Features.Get<IConnectionTransportFeature>();
connectionTransportFeature.Transport.Input.CancelPendingRead();
// Use ReadAsync() instead of CopyToAsync() for this test since IsCanceled is only checked in
// HttpRequestStream.ReadAsync() and not HttpRequestStream.CopyToAsync()
Assert.Equal(0, await duplexStream.ReadAsync(new byte[1]));
appCompletedTcs.SetResult(null);
}
catch (Exception ex)
{
appCompletedTcs.SetException(ex);
throw;
}
},
new TestServiceContext(LoggerFactory));
using (var connection = server.CreateConnection())
{
await connection.SendEmptyGetWithUpgrade();
await connection.Receive("HTTP/1.1 101 Switching Protocols",
"Connection: Upgrade",
$"Date: {server.Context.DateHeaderValue}",
"",
"");
}
await appCompletedTcs.Task.DefaultTimeout();
}
}
}