Close UvAsyncHandle properly to avoid zombie threads

- Even when safe handles are disposed explicitly, ReleaseHandle is sometimes
  called on another thread which breaks uv_close.
- Ensure we close the UvAsyncHandle the uv loop so that the second call
  to uv_run always completes without a timeout/Thread.Abort.
- Re-enable some tests. Add skip conditions for those that aren't passing.
This commit is contained in:
Stephen Halter 2016-02-09 15:01:04 -08:00
parent 54caf3071c
commit 304016fc3b
18 changed files with 132 additions and 122 deletions

View File

@ -29,8 +29,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
var tcs = new TaskCompletionSource<int>(this); var tcs = new TaskCompletionSource<int>(this);
Thread.Post(tcs2 => Thread.Post(state =>
{ {
var tcs2 = (TaskCompletionSource<int>)state;
try try
{ {
var listener = ((Listener)tcs2.Task.AsyncState); var listener = ((Listener)tcs2.Task.AsyncState);

View File

@ -46,7 +46,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private void PostCallback() private void PostCallback()
{ {
ListenPipe = new UvPipeHandle(Log); ListenPipe = new UvPipeHandle(Log);
ListenPipe.Init(Thread.Loop, false); ListenPipe.Init(Thread.Loop, Thread.QueueCloseHandle, false);
ListenPipe.Bind(_pipeName); ListenPipe.Bind(_pipeName);
ListenPipe.Listen(Constants.ListenBacklog, ListenPipe.Listen(Constants.ListenBacklog,
(pipe, status, error, state) => ((ListenerPrimary)state).OnListenPipe(pipe, status, error), this); (pipe, status, error, state) => ((ListenerPrimary)state).OnListenPipe(pipe, status, error), this);
@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
} }
var dispatchPipe = new UvPipeHandle(Log); var dispatchPipe = new UvPipeHandle(Log);
dispatchPipe.Init(Thread.Loop, true); dispatchPipe.Init(Thread.Loop, Thread.QueueCloseHandle, true);
try try
{ {

View File

@ -42,7 +42,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
DispatchPipe = new UvPipeHandle(Log); DispatchPipe = new UvPipeHandle(Log);
var tcs = new TaskCompletionSource<int>(this); var tcs = new TaskCompletionSource<int>(this);
Thread.Post(tcs2 => StartCallback(tcs2), tcs); Thread.Post(state => StartCallback((TaskCompletionSource<int>)state), tcs);
return tcs.Task; return tcs.Task;
} }
@ -56,7 +56,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{ {
try try
{ {
DispatchPipe.Init(Thread.Loop, true); DispatchPipe.Init(Thread.Loop, Thread.QueueCloseHandle, true);
var connect = new UvConnectRequest(Log); var connect = new UvConnectRequest(Log);
connect.Init(Thread.Loop); connect.Init(Thread.Loop);
connect.Connect( connect.Connect(

View File

@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
protected override UvStreamHandle CreateListenSocket() protected override UvStreamHandle CreateListenSocket()
{ {
var socket = new UvPipeHandle(Log); var socket = new UvPipeHandle(Log);
socket.Init(Thread.Loop, false); socket.Init(Thread.Loop, Thread.QueueCloseHandle, false);
socket.Bind(ServerAddress.UnixPipePath); socket.Bind(ServerAddress.UnixPipePath);
socket.Listen(Constants.ListenBacklog, (stream, status, error, state) => ConnectionCallback(stream, status, error, state), this); socket.Listen(Constants.ListenBacklog, (stream, status, error, state) => ConnectionCallback(stream, status, error, state), this);
return socket; return socket;
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
try try
{ {
acceptSocket.Init(Thread.Loop, false); acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle, false);
listenSocket.Accept(acceptSocket); listenSocket.Accept(acceptSocket);
DispatchConnection(acceptSocket); DispatchConnection(acceptSocket);
} }

View File

@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
protected override UvStreamHandle CreateListenSocket() protected override UvStreamHandle CreateListenSocket()
{ {
var socket = new UvPipeHandle(Log); var socket = new UvPipeHandle(Log);
socket.Init(Thread.Loop, false); socket.Init(Thread.Loop, Thread.QueueCloseHandle, false);
socket.Bind(ServerAddress.UnixPipePath); socket.Bind(ServerAddress.UnixPipePath);
socket.Listen(Constants.ListenBacklog, (stream, status, error, state) => ConnectionCallback(stream, status, error, state), this); socket.Listen(Constants.ListenBacklog, (stream, status, error, state) => ConnectionCallback(stream, status, error, state), this);
return socket; return socket;
@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
try try
{ {
acceptSocket.Init(Thread.Loop, false); acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle, false);
listenSocket.Accept(acceptSocket); listenSocket.Accept(acceptSocket);
DispatchConnection(acceptSocket); DispatchConnection(acceptSocket);
} }

View File

@ -20,7 +20,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
protected override UvStreamHandle CreateAcceptSocket() protected override UvStreamHandle CreateAcceptSocket()
{ {
var acceptSocket = new UvPipeHandle(Log); var acceptSocket = new UvPipeHandle(Log);
acceptSocket.Init(Thread.Loop, false); acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle, false);
return acceptSocket; return acceptSocket;
} }
} }

View File

@ -314,7 +314,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private void ScheduleWrite() private void ScheduleWrite()
{ {
_thread.Post(_this => _this.WriteAllPending(), this); _thread.Post(state => ((SocketOutput)state).WriteAllPending(), this);
} }
// This is called on the libuv event loop // This is called on the libuv event loop

View File

@ -24,9 +24,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
// otherwise it needs to wait till the next pass of the libuv loop // otherwise it needs to wait till the next pass of the libuv loop
private const int _maxLoops = 8; private const int _maxLoops = 8;
private static readonly Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state); private static readonly Action<object, object> _postCallbackAdapter = (callback, state) => ((Action<object>)callback).Invoke(state);
private static readonly Action<object, object> _socketCallbackAdapter = (callback, state) => ((Action<SocketOutput>)callback).Invoke((SocketOutput)state);
private static readonly Action<object, object> _tcsCallbackAdapter = (callback, state) => ((Action<TaskCompletionSource<int>>)callback).Invoke((TaskCompletionSource<int>)state);
private static readonly Action<object, object> _postAsyncCallbackAdapter = (callback, state) => ((Action<object>)callback).Invoke(state); private static readonly Action<object, object> _postAsyncCallbackAdapter = (callback, state) => ((Action<object>)callback).Invoke(state);
private readonly KestrelEngine _engine; private readonly KestrelEngine _engine;
@ -56,12 +54,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel
_thread = new Thread(ThreadStart); _thread = new Thread(ThreadStart);
_thread.Name = "KestrelThread - libuv"; _thread.Name = "KestrelThread - libuv";
QueueCloseHandle = PostCloseHandle; QueueCloseHandle = PostCloseHandle;
QueueCloseAsyncHandle = EnqueueCloseHandle;
} }
public UvLoopHandle Loop { get { return _loop; } } public UvLoopHandle Loop { get { return _loop; } }
public ExceptionDispatchInfo FatalError { get { return _closeError; } } public ExceptionDispatchInfo FatalError { get { return _closeError; } }
public Action<Action<IntPtr>, IntPtr> QueueCloseHandle { get; internal set; } public Action<Action<IntPtr>, IntPtr> QueueCloseHandle { get; }
private Action<Action<IntPtr>, IntPtr> QueueCloseAsyncHandle { get; }
public Task StartAsync() public Task StartAsync()
{ {
@ -137,41 +139,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel
_loop.Stop(); _loop.Stop();
} }
public void Post(Action<object> callback, object state)
{
lock (_workSync)
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _postCallbackAdapter,
Callback = callback,
State = state
});
}
_post.Send();
}
private void Post(Action<KestrelThread> callback) private void Post(Action<KestrelThread> callback)
{ {
lock (_workSync) Post(thread => callback((KestrelThread)thread), this);
{
_workAdding.Enqueue(new Work { CallbackAdapter = _threadCallbackAdapter, Callback = callback, State = this });
}
_post.Send();
}
public void Post(Action<SocketOutput> callback, SocketOutput state)
{
lock (_workSync)
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _socketCallbackAdapter,
Callback = callback,
State = state
});
}
_post.Send();
}
public void Post(Action<TaskCompletionSource<int>> callback, TaskCompletionSource<int> state)
{
lock (_workSync)
{
_workAdding.Enqueue(new Work
{
CallbackAdapter = _tcsCallbackAdapter,
Callback = callback,
State = state
});
}
_post.Send();
} }
public Task PostAsync(Action<object> callback, object state) public Task PostAsync(Action<object> callback, object state)
@ -192,12 +176,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel
} }
private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle) private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{
EnqueueCloseHandle(callback, handle);
_post.Send();
}
private void EnqueueCloseHandle(Action<IntPtr> callback, IntPtr handle)
{ {
lock (_workSync) lock (_workSync)
{ {
_closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); _closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle });
} }
_post.Send();
} }
private void ThreadStart(object parameter) private void ThreadStart(object parameter)
@ -206,7 +195,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
try try
{ {
_loop.Init(_engine.Libuv); _loop.Init(_engine.Libuv);
_post.Init(_loop, OnPost); _post.Init(_loop, OnPost, EnqueueCloseHandle);
tcs.SetResult(0); tcs.SetResult(0);
} }
catch (Exception ex) catch (Exception ex)
@ -230,18 +219,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel
_post.Reference(); _post.Reference();
_post.Dispose(); _post.Dispose();
_engine.Libuv.walk(
_loop,
(ptr, arg) =>
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
if (handle != _post)
{
handle.Dispose();
}
},
IntPtr.Zero);
// Ensure the Dispose operations complete in the event loop. // Ensure the Dispose operations complete in the event loop.
var ran2 = _loop.Run(); var ran2 = _loop.Run();
@ -307,6 +284,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
return wasWork; return wasWork;
} }
private bool DoPostCloseHandle() private bool DoPostCloseHandle()
{ {
Queue<CloseHandle> queue; Queue<CloseHandle> queue;
@ -333,7 +311,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
} }
} }
return wasWork; return wasWork;
} }
private struct Work private struct Work
@ -343,6 +321,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel
public object State; public object State;
public TaskCompletionSource<object> Completion; public TaskCompletionSource<object> Completion;
} }
private struct CloseHandle private struct CloseHandle
{ {
public Action<IntPtr> Callback; public Action<IntPtr> Callback;

View File

@ -30,6 +30,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
_uv_close = NativeDarwinMonoMethods.uv_close; _uv_close = NativeDarwinMonoMethods.uv_close;
_uv_async_init = NativeDarwinMonoMethods.uv_async_init; _uv_async_init = NativeDarwinMonoMethods.uv_async_init;
_uv_async_send = NativeDarwinMonoMethods.uv_async_send; _uv_async_send = NativeDarwinMonoMethods.uv_async_send;
_uv_unsafe_async_send = NativeDarwinMonoMethods.uv_unsafe_async_send;
_uv_tcp_init = NativeDarwinMonoMethods.uv_tcp_init; _uv_tcp_init = NativeDarwinMonoMethods.uv_tcp_init;
_uv_tcp_bind = NativeDarwinMonoMethods.uv_tcp_bind; _uv_tcp_bind = NativeDarwinMonoMethods.uv_tcp_bind;
_uv_tcp_open = NativeDarwinMonoMethods.uv_tcp_open; _uv_tcp_open = NativeDarwinMonoMethods.uv_tcp_open;
@ -71,6 +72,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
_uv_close = NativeMethods.uv_close; _uv_close = NativeMethods.uv_close;
_uv_async_init = NativeMethods.uv_async_init; _uv_async_init = NativeMethods.uv_async_init;
_uv_async_send = NativeMethods.uv_async_send; _uv_async_send = NativeMethods.uv_async_send;
_uv_unsafe_async_send = NativeMethods.uv_unsafe_async_send;
_uv_tcp_init = NativeMethods.uv_tcp_init; _uv_tcp_init = NativeMethods.uv_tcp_init;
_uv_tcp_bind = NativeMethods.uv_tcp_bind; _uv_tcp_bind = NativeMethods.uv_tcp_bind;
_uv_tcp_open = NativeMethods.uv_tcp_open; _uv_tcp_open = NativeMethods.uv_tcp_open;
@ -207,6 +209,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
Check(_uv_async_send(handle)); Check(_uv_async_send(handle));
} }
protected Func<IntPtr, int> _uv_unsafe_async_send;
public void unsafe_async_send(IntPtr handle)
{
Check(_uv_unsafe_async_send(handle));
}
protected Func<UvLoopHandle, UvTcpHandle, int> _uv_tcp_init; protected Func<UvLoopHandle, UvTcpHandle, int> _uv_tcp_init;
public void tcp_init(UvLoopHandle loop, UvTcpHandle handle) public void tcp_init(UvLoopHandle loop, UvTcpHandle handle)
{ {
@ -510,6 +518,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
[DllImport("libuv", CallingConvention = CallingConvention.Cdecl)] [DllImport("libuv", CallingConvention = CallingConvention.Cdecl)]
public extern static int uv_async_send(UvAsyncHandle handle); public extern static int uv_async_send(UvAsyncHandle handle);
[DllImport("libuv", CallingConvention = CallingConvention.Cdecl, EntryPoint = "uv_async_send")]
public extern static int uv_unsafe_async_send(IntPtr handle);
[DllImport("libuv", CallingConvention = CallingConvention.Cdecl)] [DllImport("libuv", CallingConvention = CallingConvention.Cdecl)]
public static extern int uv_tcp_init(UvLoopHandle loop, UvTcpHandle handle); public static extern int uv_tcp_init(UvLoopHandle loop, UvTcpHandle handle);
@ -618,6 +629,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
[DllImport("__Internal", CallingConvention = CallingConvention.Cdecl)] [DllImport("__Internal", CallingConvention = CallingConvention.Cdecl)]
public extern static int uv_async_send(UvAsyncHandle handle); public extern static int uv_async_send(UvAsyncHandle handle);
[DllImport("__Internal", CallingConvention = CallingConvention.Cdecl, EntryPoint = "uv_async_send")]
public extern static int uv_unsafe_async_send(IntPtr handle);
[DllImport("__Internal", CallingConvention = CallingConvention.Cdecl)] [DllImport("__Internal", CallingConvention = CallingConvention.Cdecl)]
public static extern int uv_tcp_init(UvLoopHandle loop, UvTcpHandle handle); public static extern int uv_tcp_init(UvLoopHandle loop, UvTcpHandle handle);

View File

@ -2,27 +2,33 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System; using System;
using System.Diagnostics;
using System.Threading;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Networking namespace Microsoft.AspNetCore.Server.Kestrel.Networking
{ {
public class UvAsyncHandle : UvHandle public class UvAsyncHandle : UvHandle
{ {
private static readonly Libuv.uv_close_cb _destroyMemory = (handle) => DestroyMemory(handle);
private static readonly Libuv.uv_async_cb _uv_async_cb = (handle) => AsyncCb(handle); private static readonly Libuv.uv_async_cb _uv_async_cb = (handle) => AsyncCb(handle);
private Action _callback; private Action _callback;
private Action<Action<IntPtr>, IntPtr> _queueCloseHandle;
public UvAsyncHandle(IKestrelTrace logger) : base(logger) public UvAsyncHandle(IKestrelTrace logger) : base(logger)
{ {
} }
public void Init(UvLoopHandle loop, Action callback) public void Init(UvLoopHandle loop, Action callback, Action<Action<IntPtr>, IntPtr> queueCloseHandle)
{ {
CreateMemory( CreateMemory(
loop.Libuv, loop.Libuv,
loop.ThreadId, loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.ASYNC)); loop.Libuv.handle_size(Libuv.HandleType.ASYNC));
_callback = callback; _callback = callback;
_queueCloseHandle = queueCloseHandle;
_uv.async_init(loop, this, _uv_async_cb); _uv.async_init(loop, this, _uv_async_cb);
} }
@ -35,5 +41,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
{ {
FromIntPtr<UvAsyncHandle>(handle)._callback.Invoke(); FromIntPtr<UvAsyncHandle>(handle)._callback.Invoke();
} }
protected override bool ReleaseHandle()
{
var memory = handle;
if (memory != IntPtr.Zero)
{
handle = IntPtr.Zero;
if (Thread.CurrentThread.ManagedThreadId == ThreadId)
{
_uv.close(memory, _destroyMemory);
}
else if (_queueCloseHandle != null)
{
// This can be called from the finalizer.
// Ensure the closure doesn't reference "this".
var uv = _uv;
_queueCloseHandle(memory2 => uv.close(memory2, _destroyMemory), memory);
uv.unsafe_async_send(memory);
}
else
{
Debug.Assert(false, "UvAsyncHandle not initialized with queueCloseHandle action");
return false;
}
}
return true;
}
} }
} }

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System; using System;
using System.Diagnostics;
using System.Threading; using System.Threading;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
@ -44,6 +45,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
var uv = _uv; var uv = _uv;
_queueCloseHandle(memory2 => uv.close(memory2, _destroyMemory), memory); _queueCloseHandle(memory2 => uv.close(memory2, _destroyMemory), memory);
} }
else
{
Debug.Assert(false, "UvHandle not initialized with queueCloseHandle action");
return false;
}
} }
return true; return true;
} }

View File

@ -7,7 +7,7 @@ using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Networking namespace Microsoft.AspNetCore.Server.Kestrel.Networking
{ {
public class UvLoopHandle : UvHandle public class UvLoopHandle : UvMemory
{ {
public UvLoopHandle(IKestrelTrace logger) : base(logger) public UvLoopHandle(IKestrelTrace logger) : base(logger)
{ {
@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
unsafe protected override bool ReleaseHandle() unsafe protected override bool ReleaseHandle()
{ {
var memory = this.handle; var memory = handle;
if (memory != IntPtr.Zero) if (memory != IntPtr.Zero)
{ {
// loop_close clears the gcHandlePtr // loop_close clears the gcHandlePtr
@ -46,6 +46,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
DestroyMemory(memory, gcHandlePtr); DestroyMemory(memory, gcHandlePtr);
} }
return true; return true;
} }
} }

View File

@ -87,6 +87,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle); GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle);
return (THandle)gcHandle.Target; return (THandle)gcHandle.Target;
} }
} }
} }

View File

@ -13,24 +13,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
{ {
} }
public void Init(UvLoopHandle loop, bool ipc) public void Init(UvLoopHandle loop, Action<Action<IntPtr>, IntPtr> queueCloseHandle, bool ipc = false)
{
CreateMemory(
loop.Libuv,
loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.NAMED_PIPE));
_uv.pipe_init(loop, this, ipc);
}
public void Init(UvLoopHandle loop, Action<Action<IntPtr>, IntPtr> queueCloseHandle)
{ {
CreateHandle( CreateHandle(
loop.Libuv, loop.Libuv,
loop.ThreadId, loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.TCP), queueCloseHandle); loop.Libuv.handle_size(Libuv.HandleType.NAMED_PIPE), queueCloseHandle);
_uv.pipe_init(loop, this, false); _uv.pipe_init(loop, this, ipc);
} }
public void Bind(string name) public void Bind(string name)

View File

@ -15,13 +15,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
private readonly static Libuv.uv_alloc_cb _uv_alloc_cb = (IntPtr handle, int suggested_size, out Libuv.uv_buf_t buf) => UvAllocCb(handle, suggested_size, out buf); private readonly static Libuv.uv_alloc_cb _uv_alloc_cb = (IntPtr handle, int suggested_size, out Libuv.uv_buf_t buf) => UvAllocCb(handle, suggested_size, out buf);
private readonly static Libuv.uv_read_cb _uv_read_cb = (IntPtr handle, int status, ref Libuv.uv_buf_t buf) => UvReadCb(handle, status, ref buf); private readonly static Libuv.uv_read_cb _uv_read_cb = (IntPtr handle, int status, ref Libuv.uv_buf_t buf) => UvReadCb(handle, status, ref buf);
public Action<UvStreamHandle, int, Exception, object> _listenCallback; private Action<UvStreamHandle, int, Exception, object> _listenCallback;
public object _listenState; private object _listenState;
private GCHandle _listenVitality; private GCHandle _listenVitality;
public Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback; private Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback;
public Action<UvStreamHandle, int, object> _readCallback; private Action<UvStreamHandle, int, object> _readCallback;
public object _readState; private object _readState;
private GCHandle _readVitality; private GCHandle _readVitality;
protected UvStreamHandle(IKestrelTrace logger) : base(logger) protected UvStreamHandle(IKestrelTrace logger) : base(logger)

View File

@ -14,16 +14,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Networking
{ {
} }
public void Init(UvLoopHandle loop)
{
CreateMemory(
loop.Libuv,
loop.ThreadId,
loop.Libuv.handle_size(Libuv.HandleType.TCP));
_uv.tcp_init(loop, this);
}
public void Init(UvLoopHandle loop, Action<Action<IntPtr>, IntPtr> queueCloseHandle) public void Init(UvLoopHandle loop, Action<Action<IntPtr>, IntPtr> queueCloseHandle)
{ {
CreateHandle( CreateHandle(

View File

@ -21,13 +21,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
_logger = engine.Log; _logger = engine.Log;
} }
[Fact]
public void InitAndCloseServerPipe() public void InitAndCloseServerPipe()
{ {
var loop = new UvLoopHandle(_logger); var loop = new UvLoopHandle(_logger);
var pipe = new UvPipeHandle(_logger); var pipe = new UvPipeHandle(_logger);
loop.Init(_uv); loop.Init(_uv);
pipe.Init(loop, true); pipe.Init(loop, (a, b) => { }, true);
pipe.Bind(@"\\.\pipe\InitAndCloseServerPipe"); pipe.Bind(@"\\.\pipe\InitAndCloseServerPipe");
pipe.Dispose(); pipe.Dispose();
@ -38,18 +39,19 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
} }
[Fact(Skip = "Test needs to be fixed (UvException: Error -4082 EBUSY resource busy or locked from loop_close)")]
public void ServerPipeListenForConnections() public void ServerPipeListenForConnections()
{ {
var loop = new UvLoopHandle(_logger); var loop = new UvLoopHandle(_logger);
var serverListenPipe = new UvPipeHandle(_logger); var serverListenPipe = new UvPipeHandle(_logger);
loop.Init(_uv); loop.Init(_uv);
serverListenPipe.Init(loop, false); serverListenPipe.Init(loop, (a, b) => { }, false);
serverListenPipe.Bind(@"\\.\pipe\ServerPipeListenForConnections"); serverListenPipe.Bind(@"\\.\pipe\ServerPipeListenForConnections");
serverListenPipe.Listen(128, (_1, status, error, _2) => serverListenPipe.Listen(128, (_1, status, error, _2) =>
{ {
var serverConnectionPipe = new UvPipeHandle(_logger); var serverConnectionPipe = new UvPipeHandle(_logger);
serverConnectionPipe.Init(loop, true); serverConnectionPipe.Init(loop, (a, b) => { }, true);
try try
{ {
serverListenPipe.Accept(serverConnectionPipe); serverListenPipe.Accept(serverConnectionPipe);
@ -92,7 +94,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var connect = new UvConnectRequest(new KestrelTrace(new TestKestrelTrace())); var connect = new UvConnectRequest(new KestrelTrace(new TestKestrelTrace()));
loop2.Init(_uv); loop2.Init(_uv);
clientConnectionPipe.Init(loop2, true); clientConnectionPipe.Init(loop2, (a, b) => { }, true);
connect.Init(loop2); connect.Init(loop2);
connect.Connect(clientConnectionPipe, @"\\.\pipe\ServerPipeListenForConnections", (_1, status, error, _2) => connect.Connect(clientConnectionPipe, @"\\.\pipe\ServerPipeListenForConnections", (_1, status, error, _2) =>
{ {
@ -120,6 +122,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
} }
[Fact(Skip = "Test needs to be fixed (UvException: Error -4088 EAGAIN resource temporarily unavailable from accept)")]
public void ServerPipeDispatchConnections() public void ServerPipeDispatchConnections()
{ {
var pipeName = @"\\.\pipe\ServerPipeDispatchConnections" + Guid.NewGuid().ToString("n"); var pipeName = @"\\.\pipe\ServerPipeDispatchConnections" + Guid.NewGuid().ToString("n");
@ -132,12 +135,12 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var serverConnectionTcpDisposedEvent = new ManualResetEvent(false); var serverConnectionTcpDisposedEvent = new ManualResetEvent(false);
var serverListenPipe = new UvPipeHandle(_logger); var serverListenPipe = new UvPipeHandle(_logger);
serverListenPipe.Init(loop, false); serverListenPipe.Init(loop, (a, b) => { }, false);
serverListenPipe.Bind(pipeName); serverListenPipe.Bind(pipeName);
serverListenPipe.Listen(128, (_1, status, error, _2) => serverListenPipe.Listen(128, (_1, status, error, _2) =>
{ {
serverConnectionPipe = new UvPipeHandle(_logger); serverConnectionPipe = new UvPipeHandle(_logger);
serverConnectionPipe.Init(loop, true); serverConnectionPipe.Init(loop, (a, b) => { }, true);
try try
{ {
serverListenPipe.Accept(serverConnectionPipe); serverListenPipe.Accept(serverConnectionPipe);
@ -152,13 +155,13 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}, null); }, null);
var serverListenTcp = new UvTcpHandle(_logger); var serverListenTcp = new UvTcpHandle(_logger);
serverListenTcp.Init(loop); serverListenTcp.Init(loop, (a, b) => { });
var address = ServerAddress.FromUrl("http://localhost:54321/"); var address = ServerAddress.FromUrl("http://localhost:54321/");
serverListenTcp.Bind(address); serverListenTcp.Bind(address);
serverListenTcp.Listen(128, (_1, status, error, _2) => serverListenTcp.Listen(128, (_1, status, error, _2) =>
{ {
var serverConnectionTcp = new UvTcpHandle(_logger); var serverConnectionTcp = new UvTcpHandle(_logger);
serverConnectionTcp.Init(loop); serverConnectionTcp.Init(loop, (a, b) => { });
serverListenTcp.Accept(serverConnectionTcp); serverListenTcp.Accept(serverConnectionTcp);
serverConnectionPipeAcceptedEvent.WaitOne(); serverConnectionPipeAcceptedEvent.WaitOne();
@ -188,7 +191,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var connect = new UvConnectRequest(new KestrelTrace(new TestKestrelTrace())); var connect = new UvConnectRequest(new KestrelTrace(new TestKestrelTrace()));
loop2.Init(_uv); loop2.Init(_uv);
clientConnectionPipe.Init(loop2, true); clientConnectionPipe.Init(loop2, (a, b) => { }, true);
connect.Init(loop2); connect.Init(loop2);
connect.Connect(clientConnectionPipe, pipeName, (_1, status, error, _2) => connect.Connect(clientConnectionPipe, pipeName, (_1, status, error, _2) =>
{ {
@ -208,7 +211,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
return; return;
} }
var clientConnectionTcp = new UvTcpHandle(_logger); var clientConnectionTcp = new UvTcpHandle(_logger);
clientConnectionTcp.Init(loop2); clientConnectionTcp.Init(loop2, (a, b) => { });
clientConnectionPipe.Accept(clientConnectionTcp); clientConnectionPipe.Accept(clientConnectionTcp);
var buf2 = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64); var buf2 = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64);
clientConnectionTcp.ReadStart( clientConnectionTcp.ReadStart(

View File

@ -47,7 +47,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{ {
called = true; called = true;
trigger.Dispose(); trigger.Dispose();
}); }, (a, b) => { });
trigger.Send(); trigger.Send();
loop.Run(); loop.Run();
loop.Dispose(); loop.Dispose();
@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var loop = new UvLoopHandle(_logger); var loop = new UvLoopHandle(_logger);
loop.Init(_uv); loop.Init(_uv);
var tcp = new UvTcpHandle(_logger); var tcp = new UvTcpHandle(_logger);
tcp.Init(loop); tcp.Init(loop, (a, b) => { });
var address = ServerAddress.FromUrl("http://localhost:0/"); var address = ServerAddress.FromUrl("http://localhost:0/");
tcp.Bind(address); tcp.Bind(address);
tcp.Dispose(); tcp.Dispose();
@ -75,14 +75,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var loop = new UvLoopHandle(_logger); var loop = new UvLoopHandle(_logger);
loop.Init(_uv); loop.Init(_uv);
var tcp = new UvTcpHandle(_logger); var tcp = new UvTcpHandle(_logger);
tcp.Init(loop); tcp.Init(loop, (a, b) => { });
var port = TestServer.GetNextPort(); var port = TestServer.GetNextPort();
var address = ServerAddress.FromUrl($"http://localhost:{port}/"); var address = ServerAddress.FromUrl($"http://localhost:{port}/");
tcp.Bind(address); tcp.Bind(address);
tcp.Listen(10, (stream, status, error, state) => tcp.Listen(10, (stream, status, error, state) =>
{ {
var tcp2 = new UvTcpHandle(_logger); var tcp2 = new UvTcpHandle(_logger);
tcp2.Init(loop); tcp2.Init(loop, (a, b) => { });
stream.Accept(tcp2); stream.Accept(tcp2);
tcp2.Dispose(); tcp2.Dispose();
stream.Dispose(); stream.Dispose();
@ -117,15 +117,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var loop = new UvLoopHandle(_logger); var loop = new UvLoopHandle(_logger);
loop.Init(_uv); loop.Init(_uv);
var tcp = new UvTcpHandle(_logger); var tcp = new UvTcpHandle(_logger);
tcp.Init(loop); tcp.Init(loop, (a, b) => { });
var port = TestServer.GetNextPort(); var port = TestServer.GetNextPort();
var address = ServerAddress.FromUrl($"http://localhost:{port}/"); var address = ServerAddress.FromUrl($"http://localhost:{port}/");
tcp.Bind(address); tcp.Bind(address);
tcp.Listen(10, (_, status, error, state) => tcp.Listen(10, (_, status, error, state) =>
{ {
Console.WriteLine("Connected");
var tcp2 = new UvTcpHandle(_logger); var tcp2 = new UvTcpHandle(_logger);
tcp2.Init(loop); tcp2.Init(loop, (a, b) => { });
tcp.Accept(tcp2); tcp.Accept(tcp2);
var data = Marshal.AllocCoTaskMem(500); var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart( tcp2.ReadStart(
@ -140,7 +139,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
null); null);
tcp.Dispose(); tcp.Dispose();
}, null); }, null);
Console.WriteLine("Task.Run");
var t = Task.Run(async () => var t = Task.Run(async () =>
{ {
var socket = new Socket( var socket = new Socket(
@ -179,15 +177,14 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var loop = new UvLoopHandle(_logger); var loop = new UvLoopHandle(_logger);
loop.Init(_uv); loop.Init(_uv);
var tcp = new UvTcpHandle(_logger); var tcp = new UvTcpHandle(_logger);
tcp.Init(loop); tcp.Init(loop, (a, b) => { });
var port = TestServer.GetNextPort(); var port = TestServer.GetNextPort();
var address = ServerAddress.FromUrl($"http://localhost:{port}/"); var address = ServerAddress.FromUrl($"http://localhost:{port}/");
tcp.Bind(address); tcp.Bind(address);
tcp.Listen(10, (_, status, error, state) => tcp.Listen(10, (_, status, error, state) =>
{ {
Console.WriteLine("Connected");
var tcp2 = new UvTcpHandle(_logger); var tcp2 = new UvTcpHandle(_logger);
tcp2.Init(loop); tcp2.Init(loop, (a, b) => { });
tcp.Accept(tcp2); tcp.Accept(tcp2);
var data = Marshal.AllocCoTaskMem(500); var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart( tcp2.ReadStart(
@ -227,7 +224,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
null); null);
tcp.Dispose(); tcp.Dispose();
}, null); }, null);
Console.WriteLine("Task.Run");
var t = Task.Run(async () => var t = Task.Run(async () =>
{ {
var socket = new Socket( var socket = new Socket(
@ -268,9 +264,6 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
#else #else
var count = await socket.ReceiveAsync(new[] { buffer }, SocketFlags.None); var count = await socket.ReceiveAsync(new[] { buffer }, SocketFlags.None);
#endif #endif
Console.WriteLine("count {0} {1}",
count,
System.Text.Encoding.ASCII.GetString(buffer.Array, 0, count));
if (count <= 0) break; if (count <= 0) break;
} }
socket.Dispose(); socket.Dispose();