Relieve response backpressure immediately when closing socket (#2557)

* Relieve response backpressure immediately when closing socket
This commit is contained in:
Stephen Halter 2018-05-10 17:13:57 -07:00 committed by GitHub
parent 1a313715c8
commit e6a88c1b9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 42 deletions

View File

@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
internal sealed class SocketConnection : TransportConnection
{
private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2;
private static readonly bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
private readonly Socket _socket;
private readonly PipeScheduler _scheduler;
@ -55,11 +54,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
ConnectionClosed = _connectionClosedTokenSource.Token;
// On *nix platforms, Sockets already dispatches to the ThreadPool.
var awaiterScheduler = IsWindows ? _scheduler : PipeScheduler.Inline;
_receiver = new SocketReceiver(_socket, awaiterScheduler);
_sender = new SocketSender(_socket, awaiterScheduler);
_receiver = new SocketReceiver(_socket, _scheduler);
_sender = new SocketSender(_socket, _scheduler);
}
public override MemoryPool<byte> MemoryPool { get; }
@ -69,30 +65,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
public async Task StartAsync(IConnectionDispatcher connectionDispatcher)
{
Exception sendError = null;
try
{
connectionDispatcher.OnConnection(this);
// Spawn send and receive logic
Task receiveTask = DoReceive();
Task<Exception> sendTask = DoSend();
// If the sending task completes then close the receive
// We don't need to do this in the other direction because the kestrel
// will trigger the output closing once the input is complete.
if (await Task.WhenAny(receiveTask, sendTask) == sendTask)
{
// Tell the reader it's being aborted
_socket.Dispose();
}
var receiveTask = DoReceive();
var sendTask = DoSend();
// Now wait for both to complete
await receiveTask;
sendError = await sendTask;
await sendTask;
// Dispose the socket(should noop if already called)
_socket.Dispose();
_receiver.Dispose();
_sender.Dispose();
ThreadPool.QueueUserWorkItem(state => ((SocketConnection)state).CancelConnectionClosedToken(), this);
@ -101,18 +85,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}.");
}
finally
{
// Complete the output after disposing the socket
Output.Complete(sendError);
}
}
public override void Abort()
{
// Try to gracefully close the socket to match libuv behavior.
Shutdown();
_socket.Dispose();
}
private async Task DoReceive()
@ -207,7 +185,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
}
}
private async Task<Exception> DoSend()
private async Task DoSend()
{
Exception error = null;
@ -231,10 +209,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
error = new IOException(ex.Message, ex);
}
finally
{
Shutdown();
Shutdown();
return error;
// Complete the output after disposing the socket
Output.Complete(error);
}
}
private async Task ProcessSends()
@ -284,6 +265,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
// Try to gracefully close the socket even for aborts to match libuv behavior.
_socket.Shutdown(SocketShutdown.Both);
_socket.Dispose();
}
}
}

View File

@ -2642,6 +2642,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
var responseRateTimeoutMessageLogged = new TaskCompletionSource<object>();
var connectionStopMessageLogged = new TaskCompletionSource<object>();
var requestAborted = new TaskCompletionSource<object>();
var appFuncCompleted = new TaskCompletionSource<object>();
var mockKestrelTrace = new Mock<IKestrelTrace>();
mockKestrelTrace
@ -2649,7 +2650,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
.Callback(() => responseRateTimeoutMessageLogged.SetResult(null));
mockKestrelTrace
.Setup(trace => trace.ConnectionStop(It.IsAny<string>()))
.Callback(() => connectionStopMessageLogged.SetResult(null));;
.Callback(() => connectionStopMessageLogged.SetResult(null));
var testContext = new TestServiceContext
{
@ -2660,7 +2661,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
Limits =
{
MinResponseDataRate = new MinDataRate(bytesPerSecond: double.MaxValue, gracePeriod: TimeSpan.FromSeconds(2))
MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2))
}
}
};
@ -2676,10 +2677,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
context.Response.ContentLength = responseSize;
for (var i = 0; i < chunks; i++)
try
{
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
appLogger.LogInformation("Wrote chunk of {chunkSize} bytes", chunkSize);
for (var i = 0; i < chunks; i++)
{
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
appLogger.LogInformation("Wrote chunk of {chunkSize} bytes", chunkSize);
}
}
catch (OperationCanceledException)
{
appFuncCompleted.SetResult(null);
throw;
}
}
@ -2702,7 +2711,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
await requestAborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60));
await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await AssertStreamAborted(connection.Reader.BaseStream, chunkSize * chunks);
sw.Stop();
@ -2724,6 +2733,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
var responseRateTimeoutMessageLogged = new TaskCompletionSource<object>();
var connectionStopMessageLogged = new TaskCompletionSource<object>();
var aborted = new TaskCompletionSource<object>();
var appFuncCompleted = new TaskCompletionSource<object>();
var mockKestrelTrace = new Mock<IKestrelTrace>();
mockKestrelTrace
@ -2740,7 +2750,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
Limits =
{
MinResponseDataRate = new MinDataRate(bytesPerSecond: double.MaxValue, gracePeriod: TimeSpan.FromSeconds(2))
MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2))
}
}
};
@ -2762,9 +2772,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
context.Response.ContentLength = chunks * chunkSize;
for (var i = 0; i < chunks; i++)
try
{
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
for (var i = 0; i < chunks; i++)
{
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
}
}
catch (OperationCanceledException)
{
appFuncCompleted.SetResult(null);
throw;
}
}, testContext, listenOptions))
{
@ -2780,6 +2798,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
await aborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60));
await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
// Temporary workaround for a deadlock when reading from an aborted client SslStream on Mac and Linux.
if (TestPlatformHelper.IsWindows)
@ -2795,6 +2814,93 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
[Fact]
public async Task ConnectionClosedWhenBothRequestAndResponseExperienceBackPressure()
{
const int bufferSize = 1024;
const int bufferCount = 256 * 1024;
var responseSize = bufferCount * bufferSize;
var buffer = new byte[bufferSize];
var responseRateTimeoutMessageLogged = new TaskCompletionSource<object>();
var connectionStopMessageLogged = new TaskCompletionSource<object>();
var requestAborted = new TaskCompletionSource<object>();
var appFuncCompleted = new TaskCompletionSource<object>();
var mockKestrelTrace = new Mock<IKestrelTrace>();
mockKestrelTrace
.Setup(trace => trace.ResponseMininumDataRateNotSatisfied(It.IsAny<string>(), It.IsAny<string>()))
.Callback(() => responseRateTimeoutMessageLogged.SetResult(null));
mockKestrelTrace
.Setup(trace => trace.ConnectionStop(It.IsAny<string>()))
.Callback(() => connectionStopMessageLogged.SetResult(null));
var testContext = new TestServiceContext
{
LoggerFactory = LoggerFactory,
Log = mockKestrelTrace.Object,
SystemClock = new SystemClock(),
ServerOptions =
{
Limits =
{
MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2)),
MaxRequestBodySize = responseSize
}
}
};
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
async Task App(HttpContext context)
{
context.RequestAborted.Register(() =>
{
requestAborted.SetResult(null);
});
try
{
await context.Request.Body.CopyToAsync(context.Response.Body);
}
catch
{
// This should always throw an OperationCanceledException. Unfortunately a ECONNRESET UvException sometimes gets thrown.
// This will be fixed by https://github.com/aspnet/KestrelHttpServer/pull/2547
appFuncCompleted.SetResult(null);
throw;
}
}
using (var server = new TestServer(App, testContext, listenOptions))
{
using (var connection = server.CreateConnection())
{
// Close the connection with the last request so AssertStreamCompleted actually completes.
await connection.Send(
"POST / HTTP/1.1",
"Host:",
$"Content-Length: {responseSize}",
"",
"");
var sendTask = Task.Run(async () =>
{
for (var i = 0; i < bufferCount; i++)
{
await connection.Stream.WriteAsync(buffer, 0, buffer.Length);
}
});
await requestAborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60));
await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
await AssertStreamAborted(connection.Stream, responseSize);
}
}
}
[Fact]
public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseChunks()
{
@ -2803,6 +2909,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
var chunkData = new byte[chunkSize];
var requestAborted = false;
var appFuncCompleted = new TaskCompletionSource<object>();
var mockKestrelTrace = new Mock<IKestrelTrace>();
var testContext = new TestServiceContext
@ -2831,6 +2938,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
}
appFuncCompleted.SetResult(null);
}
using (var server = new TestServer(App, testContext, listenOptions))
@ -2850,6 +2959,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
// Make sure consuming a single chunk exceeds the 2 second timeout.
var targetBytesPerSecond = chunkSize / 4;
await AssertStreamCompleted(connection.Reader.BaseStream, minTotalOutputSize, targetBytesPerSecond);
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
mockKestrelTrace.Verify(t => t.ResponseMininumDataRateNotSatisfied(It.IsAny<string>(), It.IsAny<string>()), Times.Never());
mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny<string>()), Times.Once());
@ -2859,7 +2969,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
[Fact]
public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseHeaders()
public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseHeaders()
{
var headerSize = 1024 * 1024; // 1 MB for each header value
var headerCount = 64; // 64 MB of headers per response
@ -2933,7 +3043,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
[Fact]
public async Task NonZeroContentLengthFor304StatusCodeIsAllowed()
{

View File

@ -49,6 +49,8 @@ namespace Microsoft.AspNetCore.Testing
public Socket Socket => _socket;
public Stream Stream => _stream;
public StreamReader Reader => _reader;
public void Dispose()