From db44f5b672d4d21a1e471e9ca8f1247e36ba9a15 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sun, 23 Apr 2017 20:22:49 -0700 Subject: [PATCH] Clean up LibuvOutputConsumer (#1744) * Clean up LibuvOutputConsumer - Added UvShutdownReq.ShutdownAsync - Added Debug.Assert in LibuvAwaitable since it should never race. --- .../Internal/LibuvAwaitable.cs | 10 +++- .../Internal/LibuvOutputConsumer.cs | 49 ++++++------------- .../Internal/Networking/UvShutdownReq.cs | 37 ++++++++++++-- 3 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvAwaitable.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvAwaitable.cs index da1a8661ef..5c038a4456 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvAwaitable.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvAwaitable.cs @@ -2,9 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal @@ -49,10 +49,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal public void OnCompleted(Action continuation) { + // There should never be a race between IsCompleted and OnCompleted since both operations + // should always be on the libuv thread + if (_callback == _callbackCompleted || Interlocked.CompareExchange(ref _callback, continuation, null) == _callbackCompleted) { - Task.Run(continuation); + Debug.Fail($"{typeof(LibuvAwaitable)}.{nameof(OnCompleted)} raced with {nameof(IsCompleted)}, running callback inline."); + + // Just run it inline + continuation(); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs index cdcaccfc36..8b62da54ed 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs @@ -14,8 +14,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal private readonly UvStreamHandle _socket; private readonly string _connectionId; private readonly ILibuvTrace _log; - - private readonly WriteReqPool _writeReqPool; private readonly IPipeReader _pipe; public LibuvOutputConsumer( @@ -26,17 +24,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal ILibuvTrace log) { _pipe = pipe; - // We need to have empty pipe at this moment so callback - // get's scheduled _thread = thread; _socket = socket; _connectionId = connectionId; _log = log; - _writeReqPool = thread.WriteReqPool; } public async Task WriteOutputAsync() { + var pool = _thread.WriteReqPool; + while (true) { var result = await _pipe.ReadAsync(); @@ -46,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal { if (!buffer.IsEmpty) { - var writeReq = _writeReqPool.Allocate(); + var writeReq = pool.Allocate(); try { @@ -62,14 +59,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal finally { // Make sure we return the writeReq to the pool - _writeReqPool.Return(writeReq); + pool.Return(writeReq); } } if (result.IsCancelled) { // Send a FIN - await ShutdownAsync(); + _log.ConnectionWriteFin(_connectionId); + + using (var shutdownReq = new UvShutdownReq(_log)) + { + shutdownReq.Init(_thread); + var shutdownResult = await shutdownReq.ShutdownAsync(_socket); + + _log.ConnectionWroteFin(_connectionId, shutdownResult.Status); + } + // Ensure no data is written after uv_shutdown break; } @@ -105,32 +111,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal } } } - - private Task ShutdownAsync() - { - var tcs = new TaskCompletionSource(); - _log.ConnectionWriteFin(_connectionId); - - var shutdownReq = new UvShutdownReq(_log); - try - { - shutdownReq.Init(_thread); - shutdownReq.Shutdown(_socket, (req, status, state) => - { - req.Dispose(); - _log.ConnectionWroteFin(_connectionId, status); - - tcs.TrySetResult(null); - }, - this); - } - catch (Exception) - { - shutdownReq.Dispose(); - throw; - } - - return tcs.Task; - } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Networking/UvShutdownReq.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Networking/UvShutdownReq.cs index 19af091f01..3b2f74e53c 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Networking/UvShutdownReq.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Networking/UvShutdownReq.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking { @@ -12,10 +13,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin { private readonly static LibuvFunctions.uv_shutdown_cb _uv_shutdown_cb = UvShutdownCb; - private Action _callback; + private Action _callback; private object _state; + private LibuvAwaitable _awaitable = new LibuvAwaitable(); - public UvShutdownReq(ILibuvTrace logger) : base (logger) + public UvShutdownReq(ILibuvTrace logger) : base(logger) { } @@ -24,14 +26,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin var loop = thread.Loop; CreateMemory( - loop.Libuv, + loop.Libuv, loop.ThreadId, loop.Libuv.req_size(LibuvFunctions.RequestType.SHUTDOWN)); base.Init(thread); } - public void Shutdown(UvStreamHandle handle, Action callback, object state) + public LibuvAwaitable ShutdownAsync(UvStreamHandle handle) + { + Shutdown(handle, LibuvAwaitable.Callback, _awaitable); + return _awaitable; + } + + public void Shutdown(UvStreamHandle handle, Action callback, object state) { _callback = callback; _state = state; @@ -41,9 +49,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin private static void UvShutdownCb(IntPtr ptr, int status) { var req = FromIntPtr(ptr); - req._callback(req, status, req._state); + + var callback = req._callback; req._callback = null; + + var state = req._state; req._state = null; + + Exception error = null; + if (status < 0) + { + req.Libuv.Check(status, out error); + } + + try + { + callback(req, status, error, state); + } + catch (Exception ex) + { + req._log.LogError(0, ex, "UvShutdownCb"); + throw; + } } } } \ No newline at end of file