Simplify LibuvConnection.OnRead() (#1828)
* Simplify LibuvConnection.OnRead() - Fix a null reference that sometimes occurs given an EOF status
This commit is contained in:
parent
01b8d5fad1
commit
41f1922502
|
|
@ -40,11 +40,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
}
|
||||
}
|
||||
|
||||
// For testing
|
||||
public LibuvConnection()
|
||||
{
|
||||
}
|
||||
|
||||
public string ConnectionId { get; set; }
|
||||
public IPipeWriter Input { get; set; }
|
||||
public LibuvOutputConsumer Output { get; set; }
|
||||
|
|
@ -128,76 +123,72 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
|
||||
private void OnRead(UvStreamHandle handle, int status)
|
||||
{
|
||||
var normalRead = status >= 0;
|
||||
var normalDone = status == LibuvConstants.EOF;
|
||||
var errorDone = !(normalDone || normalRead);
|
||||
var readCount = normalRead ? status : 0;
|
||||
|
||||
if (normalRead)
|
||||
if (status == 0)
|
||||
{
|
||||
Log.ConnectionRead(ConnectionId, readCount);
|
||||
// EAGAIN/EWOULDBLOCK so just return the buffer.
|
||||
// http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb
|
||||
Debug.Assert(_currentWritableBuffer != null);
|
||||
_currentWritableBuffer.Value.Commit();
|
||||
}
|
||||
else if (status > 0)
|
||||
{
|
||||
Log.ConnectionRead(ConnectionId, status);
|
||||
|
||||
Debug.Assert(_currentWritableBuffer != null);
|
||||
var currentWritableBuffer = _currentWritableBuffer.Value;
|
||||
currentWritableBuffer.Advance(status);
|
||||
var flushTask = currentWritableBuffer.FlushAsync();
|
||||
|
||||
if (!flushTask.IsCompleted)
|
||||
{
|
||||
// We wrote too many bytes to the reader, so pause reading and resume when
|
||||
// we hit the low water mark.
|
||||
_ = ApplyBackpressureAsync(flushTask);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Given a negative status, it's possible that OnAlloc wasn't called.
|
||||
_currentWritableBuffer?.Commit();
|
||||
_socket.ReadStop();
|
||||
|
||||
if (normalDone)
|
||||
IOException error = null;
|
||||
|
||||
if (status == LibuvConstants.EOF)
|
||||
{
|
||||
Log.ConnectionReadFin(ConnectionId);
|
||||
}
|
||||
}
|
||||
|
||||
IOException error = null;
|
||||
WritableBufferAwaitable? flushTask = null;
|
||||
if (errorDone)
|
||||
{
|
||||
handle.Libuv.Check(status, out var uvError);
|
||||
|
||||
// Log connection resets at a lower (Debug) level.
|
||||
if (status == LibuvConstants.ECONNRESET)
|
||||
{
|
||||
Log.ConnectionReset(ConnectionId);
|
||||
error = new ConnectionResetException(uvError.Message, uvError);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.ConnectionError(ConnectionId, uvError);
|
||||
error = new IOException(uvError.Message, uvError);
|
||||
handle.Libuv.Check(status, out var uvError);
|
||||
|
||||
// Log connection resets at a lower (Debug) level.
|
||||
if (status == LibuvConstants.ECONNRESET)
|
||||
{
|
||||
Log.ConnectionReset(ConnectionId);
|
||||
error = new ConnectionResetException(uvError.Message, uvError);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.ConnectionError(ConnectionId, uvError);
|
||||
error = new IOException(uvError.Message, uvError);
|
||||
}
|
||||
}
|
||||
|
||||
_currentWritableBuffer?.Commit();
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug.Assert(_currentWritableBuffer != null);
|
||||
|
||||
var currentWritableBuffer = _currentWritableBuffer.Value;
|
||||
currentWritableBuffer.Advance(readCount);
|
||||
flushTask = currentWritableBuffer.FlushAsync();
|
||||
}
|
||||
|
||||
_currentWritableBuffer = null;
|
||||
_bufferHandle.Free();
|
||||
|
||||
if (!normalRead)
|
||||
{
|
||||
_connectionContext.Abort(error);
|
||||
|
||||
// Complete after aborting the connection
|
||||
Input.Complete(error);
|
||||
}
|
||||
else if (flushTask?.IsCompleted == false)
|
||||
{
|
||||
// We wrote too many bytes too the reader so pause reading and resume when
|
||||
// we hit the low water mark
|
||||
_ = ApplyBackpressureAsync(flushTask.Value);
|
||||
}
|
||||
|
||||
// Cleanup state from last OnAlloc. This is safe even if OnAlloc wasn't called.
|
||||
_currentWritableBuffer = null;
|
||||
_bufferHandle.Free();
|
||||
}
|
||||
|
||||
private async Task ApplyBackpressureAsync(WritableBufferAwaitable flushTask)
|
||||
{
|
||||
Log.ConnectionPause(ConnectionId);
|
||||
StopReading();
|
||||
_socket.ReadStop();
|
||||
|
||||
var result = await flushTask;
|
||||
|
||||
|
|
@ -209,11 +200,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
}
|
||||
}
|
||||
|
||||
private void StopReading()
|
||||
{
|
||||
_socket.ReadStop();
|
||||
}
|
||||
|
||||
private void StartReading()
|
||||
{
|
||||
try
|
||||
|
|
@ -223,7 +209,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
catch (UvException ex)
|
||||
{
|
||||
// ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected).
|
||||
// This should be treated the same as OnRead() seeing a "normalDone" condition.
|
||||
// This should be treated the same as OnRead() seeing a negative status.
|
||||
Log.ConnectionReadFin(ConnectionId);
|
||||
var error = new IOException(ex.Message, ex);
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ using Xunit;
|
|||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
||||
{
|
||||
public class ConnectionTests
|
||||
public class LibuvConnectionTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task DoesNotEndConnectionOnZeroRead()
|
||||
|
|
@ -42,7 +42,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored);
|
||||
}, (object)null);
|
||||
|
||||
var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync();
|
||||
var readAwaitable = mockConnectionHandler.Input.Reader.ReadAsync();
|
||||
Assert.False(readAwaitable.IsCompleted);
|
||||
}
|
||||
finally
|
||||
|
|
@ -175,5 +175,40 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|||
await thread.StopAsync(TimeSpan.FromSeconds(1));
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DoesNotThrowIfOnReadCallbackCalledWithEOFButAllocCallbackNotCalled()
|
||||
{
|
||||
var mockConnectionHandler = new MockConnectionHandler();
|
||||
var mockLibuv = new MockLibuv();
|
||||
var transportContext = new TestLibuvTransportContext() { ConnectionHandler = mockConnectionHandler };
|
||||
var transport = new LibuvTransport(mockLibuv, transportContext, null);
|
||||
var thread = new LibuvThread(transport);
|
||||
|
||||
try
|
||||
{
|
||||
await thread.StartAsync();
|
||||
await thread.PostAsync(_ =>
|
||||
{
|
||||
var listenerContext = new ListenerContext(transportContext)
|
||||
{
|
||||
Thread = thread
|
||||
};
|
||||
var socket = new MockSocket(mockLibuv, Thread.CurrentThread.ManagedThreadId, transportContext.Log);
|
||||
var connection = new LibuvConnection(listenerContext, socket);
|
||||
_ = connection.Start();
|
||||
|
||||
var ignored = new LibuvFunctions.uv_buf_t();
|
||||
mockLibuv.ReadCallback(socket.InternalGetHandle(), TestConstants.EOF, ref ignored);
|
||||
}, (object)null);
|
||||
|
||||
var readAwaitable = await mockConnectionHandler.Input.Reader.ReadAsync();
|
||||
Assert.True(readAwaitable.IsCompleted);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await thread.StopAsync(TimeSpan.FromSeconds(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue