Clean up libuv connection (#1726)

* Clean up libuv connection
- Cancel all pending flushes on the input writer before
disposing the stream handle.
- Complete the pipe before disposing the socket
- Added logging for connection pause/resume.
- Added test
This commit is contained in:
David Fowler 2017-04-21 13:13:05 -07:00 committed by GitHub
parent feb9d1281e
commit a749939be4
8 changed files with 222 additions and 93 deletions

View File

@ -23,5 +23,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
void ConnectionError(string connectionId, Exception ex);
void ConnectionReset(string connectionId);
void ConnectionPause(string connectionId);
void ConnectionResume(string connectionId);
}
}

View File

@ -4,11 +4,10 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
@ -53,7 +52,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private IConnectionHandler ConnectionHandler => ListenerContext.TransportContext.ConnectionHandler;
private LibuvThread Thread => ListenerContext.Thread;
public async void Start()
public async Task Start()
{
try
{
@ -64,7 +63,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, ConnectionId, Log);
// Start socket prior to applying the ConnectionAdapter
_socket.ReadStart(_allocCallback, _readCallback, this);
StartReading();
try
{
@ -80,9 +79,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
finally
{
// Ensure the socket is disposed prior to completing in the input writer.
_socket.Dispose();
// Make sure it isn't possible for a paused read to resume reading after calling uv_close
// on the stream handle
Input.CancelPendingFlush();
// Now, complete the input so that no more reads can happen
Input.Complete(new TaskCanceledException("The request was aborted"));
// We're done with the socket now
_socket.Dispose();
// Tell the kestrel we're done with this connection
_connectionContext.OnConnectionClosed();
}
}
@ -171,12 +178,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
_currentWritableBuffer = null;
if (flushTask?.IsCompleted == false)
{
Pause();
Log.ConnectionPause(ConnectionId);
StopReading();
var result = await flushTask.Value;
// If the reader isn't complete then resume
if (!result.IsCompleted)
if (!result.IsCompleted && !result.IsCancelled)
{
Resume();
Log.ConnectionResume(ConnectionId);
StartReading();
}
}
@ -189,31 +199,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
}
private void Pause()
private void StopReading()
{
// It's possible that uv_close was called between the call to Thread.Post() and now.
if (!_socket.IsClosed)
{
_socket.ReadStop();
}
_socket.ReadStop();
}
private void Resume()
private void StartReading()
{
// It's possible that uv_close was called even before the call to Resume().
if (!_socket.IsClosed)
try
{
try
{
_socket.ReadStart(_allocCallback, _readCallback, this);
}
catch (UvException)
{
// ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected).
// This should be treated the same as OnRead() seeing a "normalDone" condition.
Log.ConnectionReadFin(ConnectionId);
Input.Complete();
}
_socket.ReadStart(_allocCallback, _readCallback, this);
}
catch (UvException)
{
// ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected).
// This should be treated the same as OnRead() seeing a "normalDone" condition.
Log.ConnectionReadFin(ConnectionId);
Input.Complete();
}
}
}

View File

@ -10,6 +10,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
// ConnectionRead: Reserved: 3
private static readonly Action<ILogger, string, Exception> _connectionPause =
LoggerMessage.Define<string>(LogLevel.Debug, 4, @"Connection id ""{ConnectionId}"" paused.");
private static readonly Action<ILogger, string, Exception> _connectionResume =
LoggerMessage.Define<string>(LogLevel.Debug, 5, @"Connection id ""{ConnectionId}"" resumed.");
private static readonly Action<ILogger, string, Exception> _connectionReadFin =
LoggerMessage.Define<string>(LogLevel.Debug, 6, @"Connection id ""{ConnectionId}"" received FIN.");
@ -79,6 +85,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
_connectionReset(_logger, connectionId, null);
}
public void ConnectionPause(string connectionId)
{
_connectionPause(_logger, connectionId, null);
}
public void ConnectionResume(string connectionId)
{
_connectionResume(_logger, connectionId, null);
}
public IDisposable BeginScope<TState>(TState state) => _logger.BeginScope(state);
public bool IsEnabled(LogLevel logLevel) => _logger.IsEnabled(logLevel);

View File

@ -95,7 +95,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
var listener = (Listener) state;
var listener = (Listener)state;
if (error != null)
{
@ -132,7 +132,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
protected virtual void DispatchConnection(UvStreamHandle socket)
{
var connection = new LibuvConnection(this, socket);
connection.Start();
_ = connection.Start();
}
public virtual async Task DisposeAsync()

View File

@ -102,11 +102,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
(handle, status2, state) => ((ListenerSecondary)state).ReadStartCallback(handle, status2),
this);
writeReq.Init(Thread);
var result = await writeReq.WriteAsync(
DispatchPipe,
new ArraySegment<ArraySegment<byte>>(new [] { new ArraySegment<byte>(_pipeMessage) }));
writeReq.Init(Thread);
var result = await writeReq.WriteAsync(
DispatchPipe,
new ArraySegment<ArraySegment<byte>>(new[] { new ArraySegment<byte>(_pipeMessage) }));
if (result.Error != null)
{
tcs.SetException(result.Error);
@ -163,7 +163,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
try
{
var connection = new LibuvConnection(this, acceptSocket);
connection.Start();
_ = connection.Start();
}
catch (UvException ex)
{

View File

@ -4,9 +4,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers;
using Microsoft.AspNetCore.Testing;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
@ -16,38 +19,160 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
[Fact]
public async Task DoesNotEndConnectionOnZeroRead()
{
using (var mockConnectionHandler = new MockConnectionHandler())
var mockConnectionHandler = new MockConnectionHandler();
var mockLibuv = new MockLibuv();
var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler };
var transport = new LibuvTransport(mockLibuv, transportContext, null);
var thread = new LibuvThread(transport);
try
{
var mockLibuv = new MockLibuv();
var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler };
var transport = new LibuvTransport(mockLibuv, transportContext, null);
var thread = new LibuvThread(transport);
try
await thread.StartAsync();
await thread.PostAsync(_ =>
{
await thread.StartAsync();
await thread.PostAsync(_ =>
var listenerContext = new ListenerContext(transportContext)
{
var listenerContext = new ListenerContext(transportContext)
{
Thread = thread
};
var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log);
var connection = new LibuvConnection(listenerContext, socket);
connection.Start();
Thread = thread
};
var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log);
var connection = new LibuvConnection(listenerContext, socket);
_ = connection.Start();
LibuvFunctions.uv_buf_t ignored;
mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out ignored);
mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored);
}, (object)null);
mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out var ignored);
mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored);
}, (object)null);
var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync();
Assert.False(readAwaitable.IsCompleted);
}
finally
var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync();
Assert.False(readAwaitable.IsCompleted);
}
finally
{
await thread.StopAsync(TimeSpan.FromSeconds(1));
}
}
[Fact]
public async Task ConnectionDoesNotResumeAfterSocketCloseIfBackpressureIsApplied()
{
var mockConnectionHandler = new MockConnectionHandler();
mockConnectionHandler.InputOptions.MaximumSizeHigh = 3;
var mockLibuv = new MockLibuv();
var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler };
var transport = new LibuvTransport(mockLibuv, transportContext, null);
var thread = new LibuvThread(transport);
// We don't set the output writer scheduler here since we want to run the callback inline
mockConnectionHandler.OutputOptions.ReaderScheduler = thread;
Task connectionTask = null;
try
{
await thread.StartAsync();
// Write enough to make sure back pressure will be applied
await thread.PostAsync<object>(_ =>
{
await thread.StopAsync(TimeSpan.FromSeconds(1));
}
var listenerContext = new ListenerContext(transportContext)
{
Thread = thread
};
var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log);
var connection = new LibuvConnection(listenerContext, socket);
connectionTask = connection.Start();
mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out var ignored);
mockLibuv.ReadCallback(socket.InternalGetHandle(), 5, ref ignored);
}, null);
// Now assert that we removed the callback from libuv to stop reading
Assert.Null(mockLibuv.AllocCallback);
Assert.Null(mockLibuv.ReadCallback);
// Now complete the output writer so that the connection closes
mockConnectionHandler.Output.Writer.Complete();
await connectionTask.TimeoutAfter(TimeSpan.FromSeconds(10));
// Assert that we don't try to start reading
Assert.Null(mockLibuv.AllocCallback);
Assert.Null(mockLibuv.ReadCallback);
}
finally
{
await thread.StopAsync(TimeSpan.FromSeconds(1));
}
}
[Fact]
public async Task ConnectionDoesNotResumeAfterReadCallbackScheduledAndSocketCloseIfBackpressureIsApplied()
{
var mockConnectionHandler = new MockConnectionHandler();
mockConnectionHandler.InputOptions.MaximumSizeHigh = 3;
mockConnectionHandler.InputOptions.MaximumSizeLow = 3;
var mockLibuv = new MockLibuv();
var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler };
var transport = new LibuvTransport(mockLibuv, transportContext, null);
var thread = new LibuvThread(transport);
var mockScheduler = new Mock<IScheduler>();
Action backPressure = null;
mockScheduler.Setup(m => m.Schedule(It.IsAny<Action>())).Callback<Action>(a =>
{
backPressure = a;
});
mockConnectionHandler.InputOptions.WriterScheduler = mockScheduler.Object;
mockConnectionHandler.OutputOptions.ReaderScheduler = thread;
Task connectionTask = null;
try
{
await thread.StartAsync();
// Write enough to make sure back pressure will be applied
await thread.PostAsync<object>(_ =>
{
var listenerContext = new ListenerContext(transportContext)
{
Thread = thread
};
var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log);
var connection = new LibuvConnection(listenerContext, socket);
connectionTask = connection.Start();
mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out var ignored);
mockLibuv.ReadCallback(socket.InternalGetHandle(), 5, ref ignored);
}, null);
// Now assert that we removed the callback from libuv to stop reading
Assert.Null(mockLibuv.AllocCallback);
Assert.Null(mockLibuv.ReadCallback);
// Now release backpressure by reading the input
var result = await mockConnectionHandler.Input.Reader.ReadAsync();
// Calling advance will call into our custom scheduler that captures the back pressure
// callback
mockConnectionHandler.Input.Reader.Advance(result.Buffer.End);
// Cancel the current pending flush
mockConnectionHandler.Input.Writer.CancelPendingFlush();
// Now release the back pressure
await thread.PostAsync(a => a(), backPressure);
// Assert that we don't try to start reading since the write was cancelled
Assert.Null(mockLibuv.AllocCallback);
Assert.Null(mockLibuv.ReadCallback);
// Now complete the output writer and wait for the connection to close
mockConnectionHandler.Output.Writer.Complete();
await connectionTask.TimeoutAfter(TimeSpan.FromSeconds(10));
// Assert that we don't try to start reading
Assert.Null(mockLibuv.AllocCallback);
Assert.Null(mockLibuv.ReadCallback);
}
finally
{
await thread.StopAsync(TimeSpan.FromSeconds(1));
}
}
}

View File

@ -9,21 +9,15 @@ using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
{
public class MockConnectionHandler : IConnectionHandler, IDisposable
public class MockConnectionHandler : IConnectionHandler
{
private readonly PipeFactory _pipeFactory;
public MockConnectionHandler()
{
_pipeFactory = new PipeFactory();
}
public PipeOptions InputOptions { get; set; } = new PipeOptions();
public PipeOptions OutputOptions { get; set; } = new PipeOptions();
public IConnectionContext OnConnection(IConnectionInformation connectionInfo)
{
Assert.Null(Input);
Input = _pipeFactory.Create();
Output = _pipeFactory.Create();
Input = connectionInfo.PipeFactory.Create(InputOptions ?? new PipeOptions());
Output = connectionInfo.PipeFactory.Create(OutputOptions ?? new PipeOptions());
return new TestConnectionContext
{
@ -34,13 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
public IPipe Input { get; private set; }
public IPipe Output { get; private set; }
public void Dispose()
{
Input?.Writer.Complete();
_pipeFactory.Dispose();
}
private class TestConnectionContext : IConnectionContext
{
public string ConnectionId { get; }
@ -49,22 +37,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
public void OnConnectionClosed()
{
throw new NotImplementedException();
}
public Task StopAsync()
{
throw new NotImplementedException();
}
public void Abort(Exception ex)
{
throw new NotImplementedException();
}
public void Timeout()
{
throw new NotImplementedException();
}
}
}

View File

@ -120,7 +120,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
_uv_err_name = errno => IntPtr.Zero;
_uv_strerror = errno => IntPtr.Zero;
_uv_read_start = UvReadStart;
_uv_read_stop = handle => 0;
_uv_read_stop = (handle) =>
{
AllocCallback = null;
ReadCallback = null;
return 0;
};
_uv_unsafe_async_send = handle =>
{
throw new Exception($"Why is this getting called?{Environment.NewLine}{_stackTrace}");