Garbage collecting safe handles need to queue the uv_close

All of the uv_* calls must be called on the original thread
the finalizer thread cleaning up safehandle classes needs
special handling

see #16
This commit is contained in:
Louis DeJardin 2014-07-08 16:02:09 -07:00
parent 836be5565a
commit 2da561cb7a
10 changed files with 149 additions and 33 deletions

View File

@ -69,7 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
try
{
ListenSocket = new UvTcpHandle();
ListenSocket.Init(Thread.Loop);
ListenSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
ListenSocket.Bind(new IPEndPoint(IPAddress.Any, port));
ListenSocket.Listen(10, _connectionCallback, this);
tcs.SetResult(0);
@ -85,7 +85,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private void OnConnection(UvStreamHandle listenSocket, int status)
{
var acceptSocket = new UvTcpHandle();
acceptSocket.Init(Thread.Loop);
acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
listenSocket.Accept(acceptSocket);
var connection = new Connection(this, acceptSocket);

View File

@ -4,6 +4,7 @@
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
@ -21,6 +22,8 @@ namespace Microsoft.AspNet.Server.Kestrel
UvAsyncHandle _post;
Queue<Work> _workAdding = new Queue<Work>();
Queue<Work> _workRunning = new Queue<Work>();
Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>();
Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>();
object _workSync = new Object();
bool _stopImmediate = false;
private ExceptionDispatchInfo _closeError;
@ -31,10 +34,13 @@ namespace Microsoft.AspNet.Server.Kestrel
_loop = new UvLoopHandle();
_post = new UvAsyncHandle();
_thread = new Thread(ThreadStart);
QueueCloseHandle = PostCloseHandle;
}
public UvLoopHandle Loop { get { return _loop; } }
public Action<Action<IntPtr>, IntPtr> QueueCloseHandle { get; internal set; }
public Task StartAsync()
{
var tcs = new TaskCompletionSource<int>();
@ -92,6 +98,15 @@ namespace Microsoft.AspNet.Server.Kestrel
return tcs.Task;
}
private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{
lock (_workSync)
{
_closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle });
}
_post.Send();
}
private void ThreadStart(object parameter)
{
var tcs = (TaskCompletionSource<int>)parameter;
@ -119,7 +134,7 @@ namespace Microsoft.AspNet.Server.Kestrel
_post.Reference();
_post.DangerousClose();
_engine.Libuv.walk(
_loop,
_loop,
(ptr, arg) =>
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
@ -138,12 +153,19 @@ namespace Microsoft.AspNet.Server.Kestrel
private void OnPost()
{
var queue = _workAdding;
DoPostWork();
DoPostCloseHandle();
}
private void DoPostWork()
{
Queue<Work> queue;
lock (_workSync)
{
queue = _workAdding;
_workAdding = _workRunning;
_workRunning = queue;
}
_workRunning = queue;
while (queue.Count != 0)
{
var work = queue.Dequeue();
@ -156,7 +178,7 @@ namespace Microsoft.AspNet.Server.Kestrel
tcs =>
{
((TaskCompletionSource<int>)tcs).SetResult(0);
},
},
work.Completion);
}
}
@ -168,11 +190,33 @@ namespace Microsoft.AspNet.Server.Kestrel
}
else
{
// TODO: unobserved exception?
Trace.WriteLine("KestrelThread.DoPostWork " + ex.ToString());
}
}
}
}
private void DoPostCloseHandle()
{
Queue<CloseHandle> queue;
lock (_workSync)
{
queue = _closeHandleAdding;
_closeHandleAdding = _closeHandleRunning;
_closeHandleRunning = queue;
}
while (queue.Count != 0)
{
var closeHandle = queue.Dequeue();
try
{
closeHandle.Callback(closeHandle.Handle);
}
catch (Exception ex)
{
Trace.WriteLine("KestrelThread.DoPostCloseHandle " + ex.ToString());
}
}
}
private struct Work
{
@ -180,5 +224,10 @@ namespace Microsoft.AspNet.Server.Kestrel
public object State;
public TaskCompletionSource<int> Completion;
}
private struct CloseHandle
{
public Action<IntPtr> Callback;
public IntPtr Handle;
}
}
}

View File

@ -128,6 +128,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
handle.Validate(closed: true);
_uv_close(handle.InternalGetHandle(), close_cb);
}
public void close(IntPtr handle, uv_close_cb close_cb)
{
_uv_close(handle, close_cb);
}
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void uv_async_cb(IntPtr handle);

View File

@ -12,7 +12,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
public void Init(UvLoopHandle loop, Action callback)
{
CreateHandle(loop, loop.Libuv.handle_size(Libuv.HandleType.ASYNC));
CreateMemory(
loop.Libuv,
loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.ASYNC));
_callback = callback;
_uv.async_init(loop, this, _uv_async_cb);
}

View File

@ -2,20 +2,40 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
namespace Microsoft.AspNet.Server.Kestrel.Networking
{
public abstract class UvHandle : UvMemory
{
static Libuv.uv_close_cb _close_cb = DestroyHandle;
static Libuv.uv_close_cb _destroyMemory = DestroyMemory;
Action<Action<IntPtr>, IntPtr> _queueCloseHandle;
unsafe protected void CreateHandle(
Libuv uv,
int threadId,
int size,
Action<Action<IntPtr>, IntPtr> queueCloseHandle)
{
_queueCloseHandle = queueCloseHandle;
CreateMemory(uv, threadId, size);
}
protected override bool ReleaseHandle()
{
var memory = handle;
if (memory != IntPtr.Zero)
{
_uv.close(this, _close_cb);
handle = IntPtr.Zero;
if (Thread.CurrentThread.ManagedThreadId == ThreadId)
{
_uv.close(memory, _destroyMemory);
}
else
{
_queueCloseHandle(memory2 => _uv.close(memory2, _destroyMemory), memory);
}
}
return true;
}

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
namespace Microsoft.AspNet.Server.Kestrel.Networking
{
@ -9,7 +10,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
public void Init(Libuv uv)
{
CreateHandle(uv, uv.loop_size());
CreateMemory(
uv,
Thread.CurrentThread.ManagedThreadId,
uv.loop_size());
_uv.loop_init(this);
}
@ -30,7 +35,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
_uv.loop_close(this);
handle = IntPtr.Zero;
DestroyHandle(memory);
DestroyMemory(memory);
}
return true;
}

View File

@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
public abstract class UvMemory : SafeHandle
{
protected Libuv _uv;
int _threadId;
private int _threadId;
public UvMemory() : base(IntPtr.Zero, true)
{
@ -30,19 +30,36 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
}
}
unsafe protected void CreateHandle(Libuv uv, int size)
public int ThreadId
{
get
{
return _threadId;
}
private set
{
_threadId = value;
}
}
unsafe protected void CreateMemory(Libuv uv, int threadId, int size)
{
_uv = uv;
_threadId = Thread.CurrentThread.ManagedThreadId;
ThreadId = threadId;
handle = Marshal.AllocCoTaskMem(size);
*(IntPtr*)handle = GCHandle.ToIntPtr(GCHandle.Alloc(this, GCHandleType.Weak));
}
protected void CreateHandle(UvLoopHandle loop, int size)
unsafe protected static void DestroyMemory(IntPtr memory)
{
CreateHandle(loop._uv, size);
_threadId = loop._threadId;
var gcHandlePtr = *(IntPtr*)memory;
if (gcHandlePtr != IntPtr.Zero)
{
var gcHandle = GCHandle.FromIntPtr(gcHandlePtr);
gcHandle.Free();
}
Marshal.FreeCoTaskMem(memory);
}
internal IntPtr InternalGetHandle()
@ -57,16 +74,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
Trace.Assert(_threadId == Thread.CurrentThread.ManagedThreadId, "ThreadId is incorrect");
}
unsafe protected static void DestroyHandle(IntPtr memory)
{
var gcHandlePtr = *(IntPtr*)memory;
if (gcHandlePtr != IntPtr.Zero)
{
GCHandle.FromIntPtr(gcHandlePtr).Free();
}
Marshal.FreeCoTaskMem(memory);
}
unsafe public static THandle FromIntPtr<THandle>(IntPtr handle)
{
GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle);

View File

@ -17,7 +17,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
public void Init(UvLoopHandle loop)
{
CreateHandle(loop, loop.Libuv.req_size(Libuv.RequestType.SHUTDOWN));
CreateMemory(
loop.Libuv,
loop.ThreadId,
loop.Libuv.req_size(Libuv.RequestType.SHUTDOWN));
}
public void Shutdown(UvStreamHandle handle, Action<UvShutdownReq, int, object> callback, object state)

View File

@ -10,7 +10,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
public void Init(UvLoopHandle loop)
{
CreateHandle(loop, loop.Libuv.handle_size(Libuv.HandleType.TCP));
CreateMemory(
loop.Libuv,
loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.TCP));
_uv.tcp_init(loop, this);
}
public void Init(UvLoopHandle loop, Action<Action<IntPtr>, IntPtr> queueCloseHandle)
{
CreateHandle(
loop.Libuv,
loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.TCP), queueCloseHandle);
_uv.tcp_init(loop, this);
}

View File

@ -27,7 +27,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
var requestSize = loop.Libuv.req_size(Libuv.RequestType.WRITE);
var bufferSize = Marshal.SizeOf(typeof(Libuv.uv_buf_t)) * BUFFER_COUNT;
CreateHandle(loop, requestSize + bufferSize);
CreateMemory(
loop.Libuv,
loop.ThreadId,
requestSize + bufferSize);
_bufs = handle + requestSize;
}
@ -37,17 +40,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
Action<UvWriteReq, int, Exception, object> callback,
object state)
{
// add GCHandle to keeps this SafeHandle alive while request processing
_pins.Add(GCHandle.Alloc(this, GCHandleType.Normal));
var pBuffers = (Libuv.uv_buf_t*)_bufs;
var nBuffers = bufs.Count;
if (nBuffers > BUFFER_COUNT)
{
// create and pin buffer array when it's larger than the pre-allocated one
var bufArray = new Libuv.uv_buf_t[nBuffers];
var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned);
_pins.Add(gcHandle);
pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject();
}
for (var index = 0; index != nBuffers; ++index)
{
// create and pin each segment being written
var buf = bufs.Array[bufs.Offset + index];
var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned);
@ -56,6 +65,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
gcHandle.AddrOfPinnedObject() + buf.Offset,
buf.Count);
}
_callback = callback;
_state = state;
_uv.write(this, handle, pBuffers, nBuffers, _uv_write_cb);
@ -97,7 +107,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
protected override bool ReleaseHandle()
{
DestroyHandle(handle);
DestroyMemory(handle);
handle = IntPtr.Zero;
return true;
}