Don't dispose WriteReqPool and PipeFactory too soon (#1633)

This commit is contained in:
Stephen Halter 2017-04-08 01:35:02 -07:00 committed by David Fowler
parent 0fd885e5eb
commit 58284bde5c
3 changed files with 84 additions and 30 deletions

View File

@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
public IPEndPoint RemoteEndPoint { get; set; }
public IPEndPoint LocalEndPoint { get; set; }
public PipeFactory PipeFactory => ListenerContext.Thread.PipelineFactory;
public PipeFactory PipeFactory => ListenerContext.Thread.PipeFactory;
public IScheduler InputWriterScheduler => ListenerContext.Thread;
public IScheduler OutputReaderScheduler => ListenerContext.Thread;

View File

@ -69,7 +69,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
#endif
QueueCloseHandle = PostCloseHandle;
QueueCloseAsyncHandle = EnqueueCloseHandle;
PipelineFactory = new PipeFactory();
PipeFactory = new PipeFactory();
WriteReqPool = new WriteReqPool(this, _log);
ConnectionManager = new LibuvConnectionManager(this);
}
@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
public UvLoopHandle Loop { get { return _loop; } }
public PipeFactory PipelineFactory { get; }
public PipeFactory PipeFactory { get; }
public LibuvConnectionManager ConnectionManager { get; }
@ -140,7 +140,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
catch (ObjectDisposedException)
{
// Until we rework this logic, ODEs are bound to happen sometimes.
if (!await WaitAsync(_threadTcs.Task, stepTimeout).ConfigureAwait(false))
{
_log.LogCritical($"{nameof(LibuvThread)}.{nameof(StopAsync)} failed to terminate libuv thread.");
@ -157,34 +156,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private async Task DisposeConnectionsAsync()
{
try
// Close and wait for all connections
if (!await ConnectionManager.WalkConnectionsAndCloseAsync(_shutdownTimeout).ConfigureAwait(false))
{
// Close and wait for all connections
if (!await ConnectionManager.WalkConnectionsAndCloseAsync(_shutdownTimeout).ConfigureAwait(false))
{
_log.NotAllConnectionsClosedGracefully();
_log.NotAllConnectionsClosedGracefully();
if (!await ConnectionManager.WalkConnectionsAndAbortAsync(TimeSpan.FromSeconds(1)).ConfigureAwait(false))
{
_log.NotAllConnectionsAborted();
}
if (!await ConnectionManager.WalkConnectionsAndAbortAsync(TimeSpan.FromSeconds(1)).ConfigureAwait(false))
{
_log.NotAllConnectionsAborted();
}
var result = await WaitAsync(PostAsync(state =>
{
var listener = state;
listener.WriteReqPool.Dispose();
},
this), _shutdownTimeout).ConfigureAwait(false);
if (!result)
{
_log.LogError(0, null, "Disposing write requests failed");
}
}
finally
{
PipelineFactory.Dispose();
}
}
@ -334,8 +314,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
finally
{
_threadTcs.SetResult(null);
PipeFactory.Dispose();
WriteReqPool.Dispose();
thisHandle.Free();
_threadTcs.SetResult(null);
}
}

View File

@ -172,6 +172,78 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
}
[Fact]
public async Task ServerShutsDownGracefullyWhenMaxRequestBufferSizeExceeded()
{
// Parameters
var data = new byte[_dataLength];
var bytesWrittenTimeout = TimeSpan.FromMilliseconds(100);
var bytesWrittenPollingInterval = TimeSpan.FromMilliseconds(bytesWrittenTimeout.TotalMilliseconds / 10);
var maxSendSize = 4096;
var startReadingRequestBody = new ManualResetEvent(false);
var clientFinishedSendingRequestBody = new ManualResetEvent(false);
var lastBytesWritten = DateTime.MaxValue;
using (var host = StartWebHost(16 * 1024, data, false, startReadingRequestBody, clientFinishedSendingRequestBody))
{
var port = host.GetPort();
using (var socket = CreateSocket(port))
using (var stream = new NetworkStream(socket))
{
await WritePostRequestHeaders(stream, data.Length);
var bytesWritten = 0;
Func<Task> sendFunc = async () =>
{
while (bytesWritten < data.Length)
{
var size = Math.Min(data.Length - bytesWritten, maxSendSize);
await stream.WriteAsync(data, bytesWritten, size);
bytesWritten += size;
lastBytesWritten = DateTime.Now;
}
clientFinishedSendingRequestBody.Set();
};
var gnore = sendFunc();
// The minimum is (maxRequestBufferSize - maxSendSize + 1), since if bytesWritten is
// (maxRequestBufferSize - maxSendSize) or smaller, the client should be able to
// complete another send.
var minimumExpectedBytesWritten = (16 * 1024) - maxSendSize + 1;
// The maximum is harder to determine, since there can be OS-level buffers in both the client
// and server, which allow the client to send more than maxRequestBufferSize before getting
// paused. We assume the combined buffers are smaller than the difference between
// data.Length and maxRequestBufferSize.
var maximumExpectedBytesWritten = data.Length - 1;
// Block until the send task has gone a while without writing bytes AND
// the bytes written exceeds the minimum expected. This indicates the server buffer
// is full.
//
// If the send task is paused before the expected number of bytes have been
// written, keep waiting since the pause may have been caused by something else
// like a slow machine.
while ((DateTime.Now - lastBytesWritten) < bytesWrittenTimeout ||
bytesWritten < minimumExpectedBytesWritten)
{
await Task.Delay(bytesWrittenPollingInterval);
}
// Verify the number of bytes written before the client was paused.
Assert.InRange(bytesWritten, minimumExpectedBytesWritten, maximumExpectedBytesWritten);
// Dispose host prior to closing connection to verify the server doesn't throw during shutdown
// if a connection no longer has alloc and read callbacks configured.
host.Dispose();
}
}
}
private static IWebHost StartWebHost(long? maxRequestBufferSize, byte[] expectedBody, bool useConnectionAdapter, ManualResetEvent startReadingRequestBody,
ManualResetEvent clientFinishedSendingRequestBody)
{