Adding an engine, a thread manager, an accept loop

This commit is contained in:
Louis DeJardin 2014-06-03 16:41:55 -07:00
parent 2334fe6dc8
commit c9d6db14bc
10 changed files with 446 additions and 53 deletions

View File

@ -0,0 +1,83 @@
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel
{
/// <summary>
/// Summary description for Accept
/// </summary>
public class Listener : IDisposable
{
private readonly KestrelThread _thread;
UvTcpHandle _socket;
private readonly Action<UvStreamHandle, int, object> _connectionCallback = ConnectionCallback;
private static void ConnectionCallback(UvStreamHandle stream, int status, object state)
{
((Listener)state).OnConnection(stream, status);
}
public Listener(KestrelThread thread)
{
_thread = thread;
}
public Task StartAsync()
{
var tcs = new TaskCompletionSource<int>();
_thread.Post(OnStart, tcs);
return tcs.Task;
}
public void OnStart(object parameter)
{
var tcs = (TaskCompletionSource<int>)parameter;
try
{
_socket = new UvTcpHandle();
_socket.Init(_thread.Loop);
_socket.Bind(new IPEndPoint(IPAddress.Any, 4001));
_socket.Listen(10, _connectionCallback, this);
tcs.SetResult(0);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
}
private void OnConnection(UvStreamHandle socket, int status)
{
var connection = new UvTcpHandle();
connection.Init(_thread.Loop);
socket.Accept(connection);
connection.ReadStart(OnRead, null);
}
private void OnRead(UvStreamHandle socket, int count, byte[] data, object _)
{
var text = Encoding.UTF8.GetString(data);
if (count <= 0)
{
socket.Close();
}
}
public void Dispose()
{
var socket = _socket;
_socket = null;
_thread.Post(OnDispose, socket);
}
private void OnDispose(object socket)
{
((UvHandle)socket).Close();
}
}
}

View File

@ -0,0 +1,43 @@
using System;
namespace Microsoft.AspNet.Server.Kestrel
{
/// <summary>
/// Summary description for Disposable
/// </summary>
public class Disposable : IDisposable
{
private Action _dispose;
public Disposable(Action dispose)
{
_dispose = dispose;
}
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_dispose.Invoke();
}
_dispose = null;
disposedValue = true;
}
}
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}

View File

@ -0,0 +1,130 @@
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel
{
/// <summary>
/// Summary description for KestrelThread
/// </summary>
public class KestrelThread
{
KestrelEngine _engine;
Thread _thread;
UvLoopHandle _loop;
UvAsyncHandle _post;
Queue<Work> _workAdding = new Queue<Work>();
Queue<Work> _workRunning = new Queue<Work>();
object _workSync = new Object();
bool _stopImmediate = false;
public KestrelThread(KestrelEngine engine)
{
_engine = engine;
_loop = new UvLoopHandle();
_post = new UvAsyncHandle();
_thread = new Thread(ThreadStart);
}
public UvLoopHandle Loop { get { return _loop; } }
public Task StartAsync()
{
var tcs = new TaskCompletionSource<int>();
_thread.Start(tcs);
return tcs.Task;
}
public void Stop(TimeSpan timeout)
{
Post(OnStop, null);
if (!_thread.Join(timeout))
{
Post(OnStopImmediate, null);
if (!_thread.Join(timeout))
{
_thread.Abort();
}
}
}
private void OnStop(object obj)
{
_post.Unreference();
}
private void OnStopImmediate(object obj)
{
_stopImmediate = true;
_loop.Stop();
}
public void Post(Action<object> callback, object state)
{
lock (_workSync)
{
_workAdding.Enqueue(new Work { Callback = callback, State = state });
}
_post.Send();
}
private void ThreadStart(object parameter)
{
var tcs = (TaskCompletionSource<int>)parameter;
try
{
_loop.Init(_engine.Libuv);
_post.Init(_loop, OnPost);
tcs.SetResult(0);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
var ran1 = _loop.Run();
if (_stopImmediate)
{
// thread-abort form of exit, resources will be leaked
return;
}
// run the loop one more time to delete the _post handle
_post.Reference();
_post.Close();
var ran2 = _loop.Run();
// delete the last of the unmanaged memory
_loop.Close();
}
private void OnPost()
{
var queue = _workAdding;
lock (_workSync)
{
_workAdding = _workRunning;
}
_workRunning = queue;
while (queue.Count != 0)
{
var work = queue.Dequeue();
try
{
work.Callback(work.State);
}
catch (Exception ex)
{
//TODO: unhandled exceptions
}
}
}
private struct Work
{
public Action<object> Callback;
public object State;
}
}
}

View File

@ -0,0 +1,65 @@
using System;
using Microsoft.AspNet.Server.Kestrel.Networking;
using System.Threading;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel
{
public class KestrelEngine
{
public KestrelEngine()
{
Threads = new List<KestrelThread>();
Listeners = new List<Listener>();
Libuv = new Libuv();
Libuv.Load("libuv.dll");
}
public Libuv Libuv { get; private set; }
public List<KestrelThread> Threads { get; private set; }
public List<Listener> Listeners { get; private set; }
public void Start(int count)
{
for (var index = 0; index != count; ++index)
{
Threads.Add(new KestrelThread(this));
}
foreach (var thread in Threads)
{
thread.StartAsync().Wait();
}
}
public void Stop()
{
foreach (var thread in Threads)
{
thread.Stop(TimeSpan.FromSeconds(45));
}
Threads.Clear();
}
public IDisposable CreateServer()
{
var listeners = new List<Listener>();
foreach (var thread in Threads)
{
var listener = new Listener(thread);
listener.StartAsync().Wait();
listeners.Add(listener);
}
return new Disposable(() =>
{
foreach (var listener in listeners)
{
listener.Dispose();
}
});
}
}
}

View File

@ -78,6 +78,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
_uv_stop(handle);
}
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
delegate void uv_ref(UvHandle handle);
uv_ref _uv_ref;
public void @ref(UvHandle handle)
{
_uv_ref(handle);
}
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
delegate void uv_unref(UvHandle handle);
uv_unref _uv_unref;
public void unref(UvHandle handle)
{
_uv_unref(handle);
}
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void uv_close_cb(IntPtr handle);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]

View File

@ -12,9 +12,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
unsafe static void AsyncCb(IntPtr handle)
{
GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle);
var self = (UvAsyncHandle)gcHandle.Target;
self._callback.Invoke();
FromIntPtr<UvAsyncHandle>(handle)._callback.Invoke();
}
private Action _callback;

View File

@ -54,5 +54,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
}
Marshal.FreeCoTaskMem(memory);
}
unsafe public static THandle FromIntPtr<THandle>(IntPtr handle)
{
GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle);
return (THandle)gcHandle.Target;
}
public void Reference()
{
_uv.@ref(this);
}
public void Unreference()
{
_uv.unref(this);
}
}
}

View File

@ -8,24 +8,25 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
public abstract class UvStreamHandle : UvHandle
{
private Libuv.uv_connection_cb _connection_cb;
private Libuv.uv_alloc_cb _alloc_cb;
private Libuv.uv_read_cb _read_cb;
private readonly static Libuv.uv_connection_cb _uv_connection_cb = UvConnectionCb;
private readonly static Libuv.uv_alloc_cb _uv_alloc_cb = UvAllocCb;
private readonly static Libuv.uv_read_cb _uv_read_cb = UvReadCb;
private Action<int, UvStreamHandle> _connection;
private Action<int, UvStreamHandle> _alloc;
private Action<int, byte[], UvStreamHandle> _read;
public Action<UvStreamHandle, int, object> _connectionCallback;
public object _connectionState;
public void Listen(int backlog, Action<int, UvStreamHandle> connection)
public Action<UvStreamHandle, int, byte[], object> _readCallback;
public object _readState;
public UvStreamHandle()
{
_connection_cb = OnConnection;
_connection = connection;
_uv.listen(this, 10, _connection_cb);
}
public void OnConnection(IntPtr server, int status)
public void Listen(int backlog, Action<UvStreamHandle, int, object> callback, object state)
{
_connection(status, this);
_connectionCallback = callback;
_connectionState = state;
_uv.listen(this, 10, _uv_connection_cb);
}
public void Accept(UvStreamHandle handle)
@ -33,42 +34,52 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
_uv.accept(this, handle);
}
public void ReadStart(Action<int, byte[], UvStreamHandle> read)
public void ReadStart(
Action<UvStreamHandle, int, byte[], object> callback,
object state)
{
_alloc_cb = OnAlloc;
_read_cb = OnRead;
_read = read;
_uv.read_start(this, _alloc_cb, _read_cb);
}
private void OnAlloc(IntPtr server, int suggested_size, out Libuv.uv_buf_t buf)
{
buf = new Libuv.uv_buf_t
{
memory = Marshal.AllocCoTaskMem(suggested_size),
len = (uint)suggested_size,
};
}
private void OnRead(IntPtr server, int nread, ref Libuv.uv_buf_t buf)
{
if (nread == -4095)
{
_read(0, null, this);
Marshal.FreeCoTaskMem(buf.memory);
return;
}
var length = _uv.Check(nread);
var data = new byte[length];
Marshal.Copy(buf.memory, data, 0, length);
Marshal.FreeCoTaskMem(buf.memory);
_read(length, data, this);
_readCallback = callback;
_readState = state;
_uv.read_start(this, _uv_alloc_cb, _uv_read_cb);
}
public void ReadStop()
{
_uv.read_stop(this);
}
private static void UvConnectionCb(IntPtr handle, int status)
{
var stream = FromIntPtr<UvStreamHandle>(handle);
stream._connectionCallback(stream, status, stream._connectionState);
}
private static void UvAllocCb(IntPtr server, int suggested_size, out Libuv.uv_buf_t buf)
{
buf = new Libuv.uv_buf_t
{
memory = Marshal.AllocCoTaskMem(suggested_size),
len = (uint)suggested_size,
};
}
private static void UvReadCb(IntPtr ptr, int nread, ref Libuv.uv_buf_t buf)
{
var stream = FromIntPtr<UvStreamHandle>(ptr);
if (nread == -4095)
{
stream._readCallback(stream, 0, null, stream._readState);
Marshal.FreeCoTaskMem(buf.memory);
return;
}
var length = stream._uv.Check(nread);
var data = new byte[length];
Marshal.Copy(buf.memory, data, 0, length);
Marshal.FreeCoTaskMem(buf.memory);
stream._readCallback(stream, length, data, stream._readState);
}
}
}

View File

@ -0,0 +1,30 @@
using Microsoft.AspNet.Server.Kestrel;
using System;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
{
/// <summary>
/// Summary description for EngineTests
/// </summary>
public class EngineTests
{
[Fact]
public async Task EngineCanStartAndStop()
{
var engine = new KestrelEngine();
engine.Start(1);
engine.Stop();
}
[Fact]
public async Task ListenerCanCreateAndDispose()
{
var engine = new KestrelEngine();
engine.Start(1);
var started = engine.CreateServer();
started.Dispose();
engine.Stop();
}
}
}

View File

@ -71,14 +71,14 @@ namespace Microsoft.AspNet.Server.KestralTests
var tcp = new UvTcpHandle();
tcp.Init(loop);
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (status, handle) =>
tcp.Listen(10, (stream, status, state) =>
{
var tcp2 = new UvTcpHandle();
tcp2.Init(loop);
tcp.Accept(tcp2);
stream.Accept(tcp2);
tcp2.Close();
tcp.Close();
});
stream.Close();
}, null);
var t = Task.Run(async () =>
{
var socket = new Socket(
@ -108,21 +108,21 @@ namespace Microsoft.AspNet.Server.KestralTests
var tcp = new UvTcpHandle();
tcp.Init(loop);
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (status, handle) =>
tcp.Listen(10, (_, status, state) =>
{
var tcp2 = new UvTcpHandle();
tcp2.Init(loop);
tcp.Accept(tcp2);
tcp2.ReadStart((nread, data, handle2) =>
tcp2.ReadStart((__, nread, data, state2) =>
{
bytesRead += nread;
if (nread == 0)
{
tcp2.Close();
}
});
}, null);
tcp.Close();
});
}, null);
var t = Task.Run(async () =>
{
var socket = new Socket(