Clean up LibuvOutputConsumer (#1744)

* Clean up LibuvOutputConsumer
- Added UvShutdownReq.ShutdownAsync
- Added Debug.Assert in LibuvAwaitable since it should never race.
This commit is contained in:
David Fowler 2017-04-23 20:22:49 -07:00 committed by GitHub
parent 650a3ccc26
commit db44f5b672
3 changed files with 54 additions and 42 deletions

View File

@ -2,9 +2,9 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
@ -49,10 +49,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
public void OnCompleted(Action continuation)
{
// There should never be a race between IsCompleted and OnCompleted since both operations
// should always be on the libuv thread
if (_callback == _callbackCompleted ||
Interlocked.CompareExchange(ref _callback, continuation, null) == _callbackCompleted)
{
Task.Run(continuation);
Debug.Fail($"{typeof(LibuvAwaitable<TRequest>)}.{nameof(OnCompleted)} raced with {nameof(IsCompleted)}, running callback inline.");
// Just run it inline
continuation();
}
}

View File

@ -14,8 +14,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
private readonly UvStreamHandle _socket;
private readonly string _connectionId;
private readonly ILibuvTrace _log;
private readonly WriteReqPool _writeReqPool;
private readonly IPipeReader _pipe;
public LibuvOutputConsumer(
@ -26,17 +24,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
ILibuvTrace log)
{
_pipe = pipe;
// We need to have empty pipe at this moment so callback
// get's scheduled
_thread = thread;
_socket = socket;
_connectionId = connectionId;
_log = log;
_writeReqPool = thread.WriteReqPool;
}
public async Task WriteOutputAsync()
{
var pool = _thread.WriteReqPool;
while (true)
{
var result = await _pipe.ReadAsync();
@ -46,7 +43,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
{
if (!buffer.IsEmpty)
{
var writeReq = _writeReqPool.Allocate();
var writeReq = pool.Allocate();
try
{
@ -62,14 +59,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
finally
{
// Make sure we return the writeReq to the pool
_writeReqPool.Return(writeReq);
pool.Return(writeReq);
}
}
if (result.IsCancelled)
{
// Send a FIN
await ShutdownAsync();
_log.ConnectionWriteFin(_connectionId);
using (var shutdownReq = new UvShutdownReq(_log))
{
shutdownReq.Init(_thread);
var shutdownResult = await shutdownReq.ShutdownAsync(_socket);
_log.ConnectionWroteFin(_connectionId, shutdownResult.Status);
}
// Ensure no data is written after uv_shutdown
break;
}
@ -105,32 +111,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
}
}
private Task ShutdownAsync()
{
var tcs = new TaskCompletionSource<object>();
_log.ConnectionWriteFin(_connectionId);
var shutdownReq = new UvShutdownReq(_log);
try
{
shutdownReq.Init(_thread);
shutdownReq.Shutdown(_socket, (req, status, state) =>
{
req.Dispose();
_log.ConnectionWroteFin(_connectionId, status);
tcs.TrySetResult(null);
},
this);
}
catch (Exception)
{
shutdownReq.Dispose();
throw;
}
return tcs.Task;
}
}
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking
{
@ -12,10 +13,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin
{
private readonly static LibuvFunctions.uv_shutdown_cb _uv_shutdown_cb = UvShutdownCb;
private Action<UvShutdownReq, int, object> _callback;
private Action<UvShutdownReq, int, Exception, object> _callback;
private object _state;
private LibuvAwaitable<UvShutdownReq> _awaitable = new LibuvAwaitable<UvShutdownReq>();
public UvShutdownReq(ILibuvTrace logger) : base (logger)
public UvShutdownReq(ILibuvTrace logger) : base(logger)
{
}
@ -24,14 +26,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin
var loop = thread.Loop;
CreateMemory(
loop.Libuv,
loop.Libuv,
loop.ThreadId,
loop.Libuv.req_size(LibuvFunctions.RequestType.SHUTDOWN));
base.Init(thread);
}
public void Shutdown(UvStreamHandle handle, Action<UvShutdownReq, int, object> callback, object state)
public LibuvAwaitable<UvShutdownReq> ShutdownAsync(UvStreamHandle handle)
{
Shutdown(handle, LibuvAwaitable<UvShutdownReq>.Callback, _awaitable);
return _awaitable;
}
public void Shutdown(UvStreamHandle handle, Action<UvShutdownReq, int, Exception, object> callback, object state)
{
_callback = callback;
_state = state;
@ -41,9 +49,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networkin
private static void UvShutdownCb(IntPtr ptr, int status)
{
var req = FromIntPtr<UvShutdownReq>(ptr);
req._callback(req, status, req._state);
var callback = req._callback;
req._callback = null;
var state = req._state;
req._state = null;
Exception error = null;
if (status < 0)
{
req.Libuv.Check(status, out error);
}
try
{
callback(req, status, error, state);
}
catch (Exception ex)
{
req._log.LogError(0, ex, "UvShutdownCb");
throw;
}
}
}
}