diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs index 3e8ad1e34c..96c147725b 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvConnection.cs @@ -40,11 +40,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } - // For testing - public LibuvConnection() - { - } - public string ConnectionId { get; set; } public IPipeWriter Input { get; set; } public LibuvOutputConsumer Output { get; set; } @@ -128,76 +123,72 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal private void OnRead(UvStreamHandle handle, int status) { - var normalRead = status >= 0; - var normalDone = status == LibuvConstants.EOF; - var errorDone = !(normalDone || normalRead); - var readCount = normalRead ? status : 0; - - if (normalRead) + if (status == 0) { - Log.ConnectionRead(ConnectionId, readCount); + // EAGAIN/EWOULDBLOCK so just return the buffer. + // http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb + Debug.Assert(_currentWritableBuffer != null); + _currentWritableBuffer.Value.Commit(); + } + else if (status > 0) + { + Log.ConnectionRead(ConnectionId, status); + + Debug.Assert(_currentWritableBuffer != null); + var currentWritableBuffer = _currentWritableBuffer.Value; + currentWritableBuffer.Advance(status); + var flushTask = currentWritableBuffer.FlushAsync(); + + if (!flushTask.IsCompleted) + { + // We wrote too many bytes to the reader, so pause reading and resume when + // we hit the low water mark. + _ = ApplyBackpressureAsync(flushTask); + } } else { + // Given a negative status, it's possible that OnAlloc wasn't called. + _currentWritableBuffer?.Commit(); _socket.ReadStop(); - if (normalDone) + IOException error = null; + + if (status == LibuvConstants.EOF) { Log.ConnectionReadFin(ConnectionId); } - } - - IOException error = null; - WritableBufferAwaitable? flushTask = null; - if (errorDone) - { - handle.Libuv.Check(status, out var uvError); - - // Log connection resets at a lower (Debug) level. - if (status == LibuvConstants.ECONNRESET) - { - Log.ConnectionReset(ConnectionId); - error = new ConnectionResetException(uvError.Message, uvError); - } else { - Log.ConnectionError(ConnectionId, uvError); - error = new IOException(uvError.Message, uvError); + handle.Libuv.Check(status, out var uvError); + + // Log connection resets at a lower (Debug) level. + if (status == LibuvConstants.ECONNRESET) + { + Log.ConnectionReset(ConnectionId); + error = new ConnectionResetException(uvError.Message, uvError); + } + else + { + Log.ConnectionError(ConnectionId, uvError); + error = new IOException(uvError.Message, uvError); + } } - _currentWritableBuffer?.Commit(); - } - else - { - Debug.Assert(_currentWritableBuffer != null); - - var currentWritableBuffer = _currentWritableBuffer.Value; - currentWritableBuffer.Advance(readCount); - flushTask = currentWritableBuffer.FlushAsync(); - } - - _currentWritableBuffer = null; - _bufferHandle.Free(); - - if (!normalRead) - { _connectionContext.Abort(error); - // Complete after aborting the connection Input.Complete(error); } - else if (flushTask?.IsCompleted == false) - { - // We wrote too many bytes too the reader so pause reading and resume when - // we hit the low water mark - _ = ApplyBackpressureAsync(flushTask.Value); - } + + // Cleanup state from last OnAlloc. This is safe even if OnAlloc wasn't called. + _currentWritableBuffer = null; + _bufferHandle.Free(); } private async Task ApplyBackpressureAsync(WritableBufferAwaitable flushTask) { Log.ConnectionPause(ConnectionId); - StopReading(); + _socket.ReadStop(); var result = await flushTask; @@ -209,11 +200,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } - private void StopReading() - { - _socket.ReadStop(); - } - private void StartReading() { try @@ -223,7 +209,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal catch (UvException ex) { // ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected). - // This should be treated the same as OnRead() seeing a "normalDone" condition. + // This should be treated the same as OnRead() seeing a negative status. Log.ConnectionReadFin(ConnectionId); var error = new IOException(ex.Message, ex); diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvConnectionTests.cs similarity index 83% rename from test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs rename to test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvConnectionTests.cs index 2ce0000554..7034a65711 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/ConnectionTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvConnectionTests.cs @@ -14,7 +14,7 @@ using Xunit; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests { - public class ConnectionTests + public class LibuvConnectionTests { [Fact] public async Task DoesNotEndConnectionOnZeroRead() @@ -42,7 +42,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored); }, (object)null); - var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync(); + var readAwaitable = mockConnectionHandler.Input.Reader.ReadAsync(); Assert.False(readAwaitable.IsCompleted); } finally @@ -175,5 +175,40 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests await thread.StopAsync(TimeSpan.FromSeconds(1)); } } + + [Fact] + public async Task DoesNotThrowIfOnReadCallbackCalledWithEOFButAllocCallbackNotCalled() + { + var mockConnectionHandler = new MockConnectionHandler(); + var mockLibuv = new MockLibuv(); + var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler }; + var transport = new LibuvTransport(mockLibuv, transportContext, null); + var thread = new LibuvThread(transport); + + try + { + await thread.StartAsync(); + await thread.PostAsync(_ => + { + var listenerContext = new ListenerContext(transportContext) + { + Thread = thread + }; + var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log); + var connection = new LibuvConnection(listenerContext, socket); + _ = connection.Start(); + + var ignored = new LibuvFunctions.uv_buf_t(); + mockLibuv.ReadCallback(socket.InternalGetHandle(), TestConstants.EOF, ref ignored); + }, (object)null); + + var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync(); + Assert.True(readAwaitable.IsCompleted); + } + finally + { + await thread.StopAsync(TimeSpan.FromSeconds(1)); + } + } } } \ No newline at end of file