diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ILibuvTrace.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ILibuvTrace.cs index 3d406cf99c..46512ba482 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ILibuvTrace.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ILibuvTrace.cs @@ -23,5 +23,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal void ConnectionError(string connectionId, Exception ex); void ConnectionReset(string connectionId); + + void ConnectionPause(string connectionId); + + void ConnectionResume(string connectionId); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 718c655b89..3c266d04e1 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -4,11 +4,10 @@ using System; using System.Diagnostics; using System.IO; -using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal @@ -53,7 +52,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal private IConnectionHandler ConnectionHandler => ListenerContext.TransportContext.ConnectionHandler; private LibuvThread Thread => ListenerContext.Thread; - public async void Start() + public async Task Start() { try { @@ -64,7 +63,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, ConnectionId, Log); // Start socket prior to applying the ConnectionAdapter - _socket.ReadStart(_allocCallback, _readCallback, this); + StartReading(); try { @@ -80,9 +79,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } finally { - // Ensure the socket is disposed prior to completing in the input writer. - _socket.Dispose(); + // Make sure it isn't possible for a paused read to resume reading after calling uv_close + // on the stream handle + Input.CancelPendingFlush(); + + // Now, complete the input so that no more reads can happen Input.Complete(new TaskCanceledException("The request was aborted")); + + // We're done with the socket now + _socket.Dispose(); + + // Tell the kestrel we're done with this connection _connectionContext.OnConnectionClosed(); } } @@ -171,12 +178,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal _currentWritableBuffer = null; if (flushTask?.IsCompleted == false) { - Pause(); + Log.ConnectionPause(ConnectionId); + StopReading(); + var result = await flushTask.Value; // If the reader isn't complete then resume - if (!result.IsCompleted) + if (!result.IsCompleted && !result.IsCancelled) { - Resume(); + Log.ConnectionResume(ConnectionId); + StartReading(); } } @@ -189,31 +199,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } - private void Pause() + private void StopReading() { - // It's possible that uv_close was called between the call to Thread.Post() and now. - if (!_socket.IsClosed) - { - _socket.ReadStop(); - } + _socket.ReadStop(); } - private void Resume() + private void StartReading() { - // It's possible that uv_close was called even before the call to Resume(). - if (!_socket.IsClosed) + try { - try - { - _socket.ReadStart(_allocCallback, _readCallback, this); - } - catch (UvException) - { - // ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected). - // This should be treated the same as OnRead() seeing a "normalDone" condition. - Log.ConnectionReadFin(ConnectionId); - Input.Complete(); - } + _socket.ReadStart(_allocCallback, _readCallback, this); + } + catch (UvException) + { + // ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected). + // This should be treated the same as OnRead() seeing a "normalDone" condition. + Log.ConnectionReadFin(ConnectionId); + Input.Complete(); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvTrace.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvTrace.cs index 8b3668312c..7e938211fb 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvTrace.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvTrace.cs @@ -10,6 +10,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal { // ConnectionRead: Reserved: 3 + private static readonly Action _connectionPause = + LoggerMessage.Define(LogLevel.Debug, 4, @"Connection id ""{ConnectionId}"" paused."); + + private static readonly Action _connectionResume = + LoggerMessage.Define(LogLevel.Debug, 5, @"Connection id ""{ConnectionId}"" resumed."); + private static readonly Action _connectionReadFin = LoggerMessage.Define(LogLevel.Debug, 6, @"Connection id ""{ConnectionId}"" received FIN."); @@ -79,6 +85,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal _connectionReset(_logger, connectionId, null); } + public void ConnectionPause(string connectionId) + { + _connectionPause(_logger, connectionId, null); + } + + public void ConnectionResume(string connectionId) + { + _connectionResume(_logger, connectionId, null); + } + public IDisposable BeginScope(TState state) => _logger.BeginScope(state); public bool IsEnabled(LogLevel logLevel) => _logger.IsEnabled(logLevel); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Listener.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Listener.cs index d9d1cf1798..c1c562c574 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Listener.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Listener.cs @@ -95,7 +95,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state) { - var listener = (Listener) state; + var listener = (Listener)state; if (error != null) { @@ -132,7 +132,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal protected virtual void DispatchConnection(UvStreamHandle socket) { var connection = new LibuvConnection(this, socket); - connection.Start(); + _ = connection.Start(); } public virtual async Task DisposeAsync() diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs index 0d45492472..4b43c9cf79 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/ListenerSecondary.cs @@ -102,11 +102,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal (handle, status2, state) => ((ListenerSecondary)state).ReadStartCallback(handle, status2), this); - writeReq.Init(Thread); - var result = await writeReq.WriteAsync( - DispatchPipe, - new ArraySegment>(new [] { new ArraySegment(_pipeMessage) })); - + writeReq.Init(Thread); + var result = await writeReq.WriteAsync( + DispatchPipe, + new ArraySegment>(new[] { new ArraySegment(_pipeMessage) })); + if (result.Error != null) { tcs.SetException(result.Error); @@ -163,7 +163,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal try { var connection = new LibuvConnection(this, acceptSocket); - connection.Start(); + _ = connection.Start(); } catch (UvException ex) { diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs index d9110cba1e..2ce0000554 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs @@ -4,9 +4,12 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers; +using Microsoft.AspNetCore.Testing; +using Moq; using Xunit; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests @@ -16,38 +19,160 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests [Fact] public async Task DoesNotEndConnectionOnZeroRead() { - using (var mockConnectionHandler = new MockConnectionHandler()) + var mockConnectionHandler = new MockConnectionHandler(); + var mockLibuv = new MockLibuv(); + var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler }; + var transport = new LibuvTransport(mockLibuv, transportContext, null); + var thread = new LibuvThread(transport); + + try { - var mockLibuv = new MockLibuv(); - var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler }; - var transport = new LibuvTransport(mockLibuv, transportContext, null); - var thread = new LibuvThread(transport); - - try + await thread.StartAsync(); + await thread.PostAsync(_ => { - await thread.StartAsync(); - await thread.PostAsync(_ => + var listenerContext = new ListenerContext(transportContext) { - var listenerContext = new ListenerContext(transportContext) - { - Thread = thread - }; - var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log); - var connection = new LibuvConnection(listenerContext, socket); - connection.Start(); + Thread = thread + }; + var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log); + var connection = new LibuvConnection(listenerContext, socket); + _ = connection.Start(); - LibuvFunctions.uv_buf_t ignored; - mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out ignored); - mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored); - }, (object)null); + mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out var ignored); + mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored); + }, (object)null); - var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync(); - Assert.False(readAwaitable.IsCompleted); - } - finally + var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync(); + Assert.False(readAwaitable.IsCompleted); + } + finally + { + await thread.StopAsync(TimeSpan.FromSeconds(1)); + } + } + + [Fact] + public async Task ConnectionDoesNotResumeAfterSocketCloseIfBackpressureIsApplied() + { + var mockConnectionHandler = new MockConnectionHandler(); + mockConnectionHandler.InputOptions.MaximumSizeHigh = 3; + var mockLibuv = new MockLibuv(); + var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler }; + var transport = new LibuvTransport(mockLibuv, transportContext, null); + var thread = new LibuvThread(transport); + // We don't set the output writer scheduler here since we want to run the callback inline + mockConnectionHandler.OutputOptions.ReaderScheduler = thread; + Task connectionTask = null; + try + { + await thread.StartAsync(); + + // Write enough to make sure back pressure will be applied + await thread.PostAsync(_ => { - await thread.StopAsync(TimeSpan.FromSeconds(1)); - } + var listenerContext = new ListenerContext(transportContext) + { + Thread = thread + }; + var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log); + var connection = new LibuvConnection(listenerContext, socket); + connectionTask = connection.Start(); + + mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out var ignored); + mockLibuv.ReadCallback(socket.InternalGetHandle(), 5, ref ignored); + + }, null); + + // Now assert that we removed the callback from libuv to stop reading + Assert.Null(mockLibuv.AllocCallback); + Assert.Null(mockLibuv.ReadCallback); + + // Now complete the output writer so that the connection closes + mockConnectionHandler.Output.Writer.Complete(); + + await connectionTask.TimeoutAfter(TimeSpan.FromSeconds(10)); + + // Assert that we don't try to start reading + Assert.Null(mockLibuv.AllocCallback); + Assert.Null(mockLibuv.ReadCallback); + } + finally + { + await thread.StopAsync(TimeSpan.FromSeconds(1)); + } + } + + [Fact] + public async Task ConnectionDoesNotResumeAfterReadCallbackScheduledAndSocketCloseIfBackpressureIsApplied() + { + var mockConnectionHandler = new MockConnectionHandler(); + mockConnectionHandler.InputOptions.MaximumSizeHigh = 3; + mockConnectionHandler.InputOptions.MaximumSizeLow = 3; + var mockLibuv = new MockLibuv(); + var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler }; + var transport = new LibuvTransport(mockLibuv, transportContext, null); + var thread = new LibuvThread(transport); + var mockScheduler = new Mock(); + Action backPressure = null; + mockScheduler.Setup(m => m.Schedule(It.IsAny())).Callback(a => + { + backPressure = a; + }); + mockConnectionHandler.InputOptions.WriterScheduler = mockScheduler.Object; + mockConnectionHandler.OutputOptions.ReaderScheduler = thread; + Task connectionTask = null; + try + { + await thread.StartAsync(); + + // Write enough to make sure back pressure will be applied + await thread.PostAsync(_ => + { + var listenerContext = new ListenerContext(transportContext) + { + Thread = thread + }; + var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log); + var connection = new LibuvConnection(listenerContext, socket); + connectionTask = connection.Start(); + + mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out var ignored); + mockLibuv.ReadCallback(socket.InternalGetHandle(), 5, ref ignored); + + }, null); + + // Now assert that we removed the callback from libuv to stop reading + Assert.Null(mockLibuv.AllocCallback); + Assert.Null(mockLibuv.ReadCallback); + + // Now release backpressure by reading the input + var result = await mockConnectionHandler.Input.Reader.ReadAsync(); + // Calling advance will call into our custom scheduler that captures the back pressure + // callback + mockConnectionHandler.Input.Reader.Advance(result.Buffer.End); + + // Cancel the current pending flush + mockConnectionHandler.Input.Writer.CancelPendingFlush(); + + // Now release the back pressure + await thread.PostAsync(a => a(), backPressure); + + // Assert that we don't try to start reading since the write was cancelled + Assert.Null(mockLibuv.AllocCallback); + Assert.Null(mockLibuv.ReadCallback); + + // Now complete the output writer and wait for the connection to close + mockConnectionHandler.Output.Writer.Complete(); + + await connectionTask.TimeoutAfter(TimeSpan.FromSeconds(10)); + + // Assert that we don't try to start reading + Assert.Null(mockLibuv.AllocCallback); + Assert.Null(mockLibuv.ReadCallback); + } + finally + { + await thread.StopAsync(TimeSpan.FromSeconds(1)); } } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs index 44bda00009..0ec0a04f12 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockConnectionHandler.cs @@ -9,21 +9,15 @@ using Xunit; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers { - public class MockConnectionHandler : IConnectionHandler, IDisposable + public class MockConnectionHandler : IConnectionHandler { - private readonly PipeFactory _pipeFactory; - - public MockConnectionHandler() - { - _pipeFactory = new PipeFactory(); - } + public PipeOptions InputOptions { get; set; } = new PipeOptions(); + public PipeOptions OutputOptions { get; set; } = new PipeOptions(); public IConnectionContext OnConnection(IConnectionInformation connectionInfo) { - Assert.Null(Input); - - Input = _pipeFactory.Create(); - Output = _pipeFactory.Create(); + Input = connectionInfo.PipeFactory.Create(InputOptions ?? new PipeOptions()); + Output = connectionInfo.PipeFactory.Create(OutputOptions ?? new PipeOptions()); return new TestConnectionContext { @@ -34,13 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers public IPipe Input { get; private set; } public IPipe Output { get; private set; } - - public void Dispose() - { - Input?.Writer.Complete(); - _pipeFactory.Dispose(); - } - + private class TestConnectionContext : IConnectionContext { public string ConnectionId { get; } @@ -49,22 +37,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers public void OnConnectionClosed() { - throw new NotImplementedException(); - } - public Task StopAsync() - { - throw new NotImplementedException(); } public void Abort(Exception ex) { - throw new NotImplementedException(); - } - - public void Timeout() - { - throw new NotImplementedException(); } } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockLibuv.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockLibuv.cs index e18e2e8572..8906dbd872 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockLibuv.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/TestHelpers/MockLibuv.cs @@ -120,7 +120,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers _uv_err_name = errno => IntPtr.Zero; _uv_strerror = errno => IntPtr.Zero; _uv_read_start = UvReadStart; - _uv_read_stop = handle => 0; + _uv_read_stop = (handle) => + { + AllocCallback = null; + ReadCallback = null; + return 0; + }; _uv_unsafe_async_send = handle => { throw new Exception($"Why is this getting called?{Environment.NewLine}{_stackTrace}");