diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs new file mode 100644 index 0000000000..720db481fe --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -0,0 +1,83 @@ +using Microsoft.AspNet.Server.Kestrel.Networking; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel +{ + /// + /// Summary description for Accept + /// + public class Listener : IDisposable + { + private readonly KestrelThread _thread; + UvTcpHandle _socket; + private readonly Action _connectionCallback = ConnectionCallback; + + private static void ConnectionCallback(UvStreamHandle stream, int status, object state) + { + ((Listener)state).OnConnection(stream, status); + } + + public Listener(KestrelThread thread) + { + _thread = thread; + } + + public Task StartAsync() + { + var tcs = new TaskCompletionSource(); + _thread.Post(OnStart, tcs); + return tcs.Task; + } + + public void OnStart(object parameter) + { + var tcs = (TaskCompletionSource)parameter; + try + { + _socket = new UvTcpHandle(); + _socket.Init(_thread.Loop); + _socket.Bind(new IPEndPoint(IPAddress.Any, 4001)); + _socket.Listen(10, _connectionCallback, this); + tcs.SetResult(0); + } + catch (Exception ex) + { + tcs.SetException(ex); + } + } + + private void OnConnection(UvStreamHandle socket, int status) + { + var connection = new UvTcpHandle(); + connection.Init(_thread.Loop); + socket.Accept(connection); + connection.ReadStart(OnRead, null); + } + + private void OnRead(UvStreamHandle socket, int count, byte[] data, object _) + { + var text = Encoding.UTF8.GetString(data); + if (count <= 0) + { + socket.Close(); + } + } + + public void Dispose() + { + var socket = _socket; + _socket = null; + _thread.Post(OnDispose, socket); + } + + private void OnDispose(object socket) + { + ((UvHandle)socket).Close(); + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs new file mode 100644 index 0000000000..9bb2a7f4bd --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs @@ -0,0 +1,43 @@ +using System; + +namespace Microsoft.AspNet.Server.Kestrel +{ + /// + /// Summary description for Disposable + /// + public class Disposable : IDisposable + { + private Action _dispose; + + public Disposable(Action dispose) + { + _dispose = dispose; + } + + #region IDisposable Support + private bool disposedValue = false; // To detect redundant calls + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + _dispose.Invoke(); + } + + _dispose = null; + disposedValue = true; + } + } + + // This code added to correctly implement the disposable pattern. + public void Dispose() + { + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + Dispose(true); + GC.SuppressFinalize(this); + } + #endregion + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs new file mode 100644 index 0000000000..83dd84a4a5 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -0,0 +1,130 @@ +using Microsoft.AspNet.Server.Kestrel.Networking; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel +{ + /// + /// Summary description for KestrelThread + /// + public class KestrelThread + { + KestrelEngine _engine; + Thread _thread; + UvLoopHandle _loop; + UvAsyncHandle _post; + Queue _workAdding = new Queue(); + Queue _workRunning = new Queue(); + object _workSync = new Object(); + bool _stopImmediate = false; + + public KestrelThread(KestrelEngine engine) + { + _engine = engine; + _loop = new UvLoopHandle(); + _post = new UvAsyncHandle(); + _thread = new Thread(ThreadStart); + } + + public UvLoopHandle Loop { get { return _loop; } } + + public Task StartAsync() + { + var tcs = new TaskCompletionSource(); + _thread.Start(tcs); + return tcs.Task; + } + + public void Stop(TimeSpan timeout) + { + Post(OnStop, null); + if (!_thread.Join(timeout)) + { + Post(OnStopImmediate, null); + if (!_thread.Join(timeout)) + { + _thread.Abort(); + } + } + } + + private void OnStop(object obj) + { + _post.Unreference(); + } + + private void OnStopImmediate(object obj) + { + _stopImmediate = true; + _loop.Stop(); + } + + public void Post(Action callback, object state) + { + lock (_workSync) + { + _workAdding.Enqueue(new Work { Callback = callback, State = state }); + } + _post.Send(); + } + + private void ThreadStart(object parameter) + { + var tcs = (TaskCompletionSource)parameter; + try + { + _loop.Init(_engine.Libuv); + _post.Init(_loop, OnPost); + tcs.SetResult(0); + } + catch (Exception ex) + { + tcs.SetException(ex); + } + var ran1 = _loop.Run(); + if (_stopImmediate) + { + // thread-abort form of exit, resources will be leaked + return; + } + + // run the loop one more time to delete the _post handle + _post.Reference(); + _post.Close(); + var ran2 = _loop.Run(); + + // delete the last of the unmanaged memory + _loop.Close(); + } + + private void OnPost() + { + var queue = _workAdding; + lock (_workSync) + { + _workAdding = _workRunning; + } + _workRunning = queue; + while (queue.Count != 0) + { + var work = queue.Dequeue(); + try + { + work.Callback(work.State); + } + catch (Exception ex) + { + //TODO: unhandled exceptions + } + } + } + + private struct Work + { + public Action Callback; + public object State; + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs new file mode 100644 index 0000000000..b404d6eb63 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs @@ -0,0 +1,65 @@ + +using System; +using Microsoft.AspNet.Server.Kestrel.Networking; +using System.Threading; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel +{ + public class KestrelEngine + { + + public KestrelEngine() + { + Threads = new List(); + Listeners = new List(); + Libuv = new Libuv(); + Libuv.Load("libuv.dll"); + } + + public Libuv Libuv { get; private set; } + public List Threads { get; private set; } + public List Listeners { get; private set; } + + public void Start(int count) + { + for (var index = 0; index != count; ++index) + { + Threads.Add(new KestrelThread(this)); + } + + foreach (var thread in Threads) + { + thread.StartAsync().Wait(); + } + } + + public void Stop() + { + foreach (var thread in Threads) + { + thread.Stop(TimeSpan.FromSeconds(45)); + } + Threads.Clear(); + } + + public IDisposable CreateServer() + { + var listeners = new List(); + foreach (var thread in Threads) + { + var listener = new Listener(thread); + listener.StartAsync().Wait(); + listeners.Add(listener); + } + return new Disposable(() => + { + foreach (var listener in listeners) + { + listener.Dispose(); + } + }); + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs index 2431574d4e..17a2299a44 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs @@ -78,6 +78,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking _uv_stop(handle); } + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate void uv_ref(UvHandle handle); + uv_ref _uv_ref; + public void @ref(UvHandle handle) + { + _uv_ref(handle); + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate void uv_unref(UvHandle handle); + uv_unref _uv_unref; + public void unref(UvHandle handle) + { + _uv_unref(handle); + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_close_cb(IntPtr handle); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UcAsyncHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UcAsyncHandle.cs index 6d8f919abb..d09b815e75 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UcAsyncHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UcAsyncHandle.cs @@ -12,9 +12,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking unsafe static void AsyncCb(IntPtr handle) { - GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle); - var self = (UvAsyncHandle)gcHandle.Target; - self._callback.Invoke(); + FromIntPtr(handle)._callback.Invoke(); } private Action _callback; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs index d03240e45c..033c649484 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs @@ -54,5 +54,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking } Marshal.FreeCoTaskMem(memory); } + + unsafe public static THandle FromIntPtr(IntPtr handle) + { + GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle); + return (THandle)gcHandle.Target; + } + + public void Reference() + { + _uv.@ref(this); + } + + public void Unreference() + { + _uv.unref(this); + } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs index 1436852863..e4d33aa849 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs @@ -8,24 +8,25 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking { public abstract class UvStreamHandle : UvHandle { - private Libuv.uv_connection_cb _connection_cb; - private Libuv.uv_alloc_cb _alloc_cb; - private Libuv.uv_read_cb _read_cb; + private readonly static Libuv.uv_connection_cb _uv_connection_cb = UvConnectionCb; + private readonly static Libuv.uv_alloc_cb _uv_alloc_cb = UvAllocCb; + private readonly static Libuv.uv_read_cb _uv_read_cb = UvReadCb; - private Action _connection; - private Action _alloc; - private Action _read; + public Action _connectionCallback; + public object _connectionState; - public void Listen(int backlog, Action connection) + public Action _readCallback; + public object _readState; + + public UvStreamHandle() { - _connection_cb = OnConnection; - _connection = connection; - _uv.listen(this, 10, _connection_cb); } - public void OnConnection(IntPtr server, int status) + public void Listen(int backlog, Action callback, object state) { - _connection(status, this); + _connectionCallback = callback; + _connectionState = state; + _uv.listen(this, 10, _uv_connection_cb); } public void Accept(UvStreamHandle handle) @@ -33,42 +34,52 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking _uv.accept(this, handle); } - public void ReadStart(Action read) + public void ReadStart( + Action callback, + object state) { - _alloc_cb = OnAlloc; - _read_cb = OnRead; - _read = read; - _uv.read_start(this, _alloc_cb, _read_cb); - } - - private void OnAlloc(IntPtr server, int suggested_size, out Libuv.uv_buf_t buf) - { - buf = new Libuv.uv_buf_t - { - memory = Marshal.AllocCoTaskMem(suggested_size), - len = (uint)suggested_size, - }; - } - - private void OnRead(IntPtr server, int nread, ref Libuv.uv_buf_t buf) - { - if (nread == -4095) - { - _read(0, null, this); - Marshal.FreeCoTaskMem(buf.memory); - return; - } - var length = _uv.Check(nread); - var data = new byte[length]; - Marshal.Copy(buf.memory, data, 0, length); - Marshal.FreeCoTaskMem(buf.memory); - - _read(length, data, this); + _readCallback = callback; + _readState = state; + _uv.read_start(this, _uv_alloc_cb, _uv_read_cb); } public void ReadStop() { _uv.read_stop(this); } + + private static void UvConnectionCb(IntPtr handle, int status) + { + var stream = FromIntPtr(handle); + stream._connectionCallback(stream, status, stream._connectionState); + } + + private static void UvAllocCb(IntPtr server, int suggested_size, out Libuv.uv_buf_t buf) + { + buf = new Libuv.uv_buf_t + { + memory = Marshal.AllocCoTaskMem(suggested_size), + len = (uint)suggested_size, + }; + + } + + private static void UvReadCb(IntPtr ptr, int nread, ref Libuv.uv_buf_t buf) + { + var stream = FromIntPtr(ptr); + if (nread == -4095) + { + stream._readCallback(stream, 0, null, stream._readState); + Marshal.FreeCoTaskMem(buf.memory); + return; + } + + var length = stream._uv.Check(nread); + var data = new byte[length]; + Marshal.Copy(buf.memory, data, 0, length); + Marshal.FreeCoTaskMem(buf.memory); + + stream._readCallback(stream, length, data, stream._readState); + } } } diff --git a/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs b/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs new file mode 100644 index 0000000000..3287054606 --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs @@ -0,0 +1,30 @@ +using Microsoft.AspNet.Server.Kestrel; +using System; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.AspNet.Server.KestralTests +{ + /// + /// Summary description for EngineTests + /// + public class EngineTests + { + [Fact] + public async Task EngineCanStartAndStop() + { + var engine = new KestrelEngine(); + engine.Start(1); + engine.Stop(); + } + [Fact] + public async Task ListenerCanCreateAndDispose() + { + var engine = new KestrelEngine(); + engine.Start(1); + var started = engine.CreateServer(); + started.Dispose(); + engine.Stop(); + } + } +} \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs b/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs index 90e4168e28..2f17803fd4 100644 --- a/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs +++ b/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs @@ -71,14 +71,14 @@ namespace Microsoft.AspNet.Server.KestralTests var tcp = new UvTcpHandle(); tcp.Init(loop); tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321)); - tcp.Listen(10, (status, handle) => + tcp.Listen(10, (stream, status, state) => { var tcp2 = new UvTcpHandle(); tcp2.Init(loop); - tcp.Accept(tcp2); + stream.Accept(tcp2); tcp2.Close(); - tcp.Close(); - }); + stream.Close(); + }, null); var t = Task.Run(async () => { var socket = new Socket( @@ -108,21 +108,21 @@ namespace Microsoft.AspNet.Server.KestralTests var tcp = new UvTcpHandle(); tcp.Init(loop); tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321)); - tcp.Listen(10, (status, handle) => + tcp.Listen(10, (_, status, state) => { var tcp2 = new UvTcpHandle(); tcp2.Init(loop); tcp.Accept(tcp2); - tcp2.ReadStart((nread, data, handle2) => + tcp2.ReadStart((__, nread, data, state2) => { bytesRead += nread; if (nread == 0) { tcp2.Close(); } - }); + }, null); tcp.Close(); - }); + }, null); var t = Task.Run(async () => { var socket = new Socket(