diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index 8560a7bbbd..8dffb3fc1f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -74,11 +74,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize) { - return new Libuv.uv_buf_t - { - memory = SocketInput.Pin(2048), - len = 2048 - }; + return handle.Libuv.buf_init( + SocketInput.Pin(2048), + 2048); } private void OnRead(UvStreamHandle handle, int nread) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs index 588966e005..d6e20e0f3a 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -62,7 +62,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { ListenSocket = new UvTcpHandle(); ListenSocket.Init(Thread.Loop); - ListenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, port)); + ListenSocket.Bind(new IPEndPoint(IPAddress.Any, port)); ListenSocket.Listen(10, _connectionCallback, this); tcs.SetResult(0); } @@ -86,7 +86,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public void Dispose() { - Console.WriteLine("Listener.Dispose"); var tcs = new TaskCompletionSource(); Thread.Post( _ => diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index fb1aa9cc0b..fa0b0a6ab3 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -28,6 +28,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public void Write(ArraySegment buffer, Action callback, object state) { + //TODO: need buffering that works + var copy = new byte[buffer.Count]; + Array.Copy(buffer.Array, buffer.Offset, copy, 0, buffer.Count); + buffer = new ArraySegment(copy); + KestrelTrace.Log.ConnectionWrite(0, buffer.Count); var req = new ThisWriteReq(); req.Init(_thread.Loop); @@ -70,24 +75,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public void Write() { - _pin = GCHandle.Alloc(_buffer.Array, GCHandleType.Pinned); - var buf = new Libuv.uv_buf_t - { - len = (uint)_buffer.Count, - memory = _pin.AddrOfPinnedObject() + _buffer.Offset - }; - Write( _socket, - new[] { buf }, - 1, + new ArraySegment>( + new[]{_buffer}), _writeCallback, this); } private void OnWrite(UvWriteReq req, int status) { - _pin.Free(); KestrelTrace.Log.ConnectionWriteCallback(0, status); //NOTE: pool this? Dispose(); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 6929767280..a59f7ba42e 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -81,6 +81,17 @@ namespace Microsoft.AspNet.Server.Kestrel _post.Send(); } + public Task PostAsync(Action callback, object state) + { + var tcs = new TaskCompletionSource(); + lock (_workSync) + { + _workAdding.Enqueue(new Work { Callback = callback, State = state, Completion = tcs }); + } + _post.Send(); + return tcs.Task; + } + private void ThreadStart(object parameter) { var tcs = (TaskCompletionSource)parameter; @@ -103,12 +114,19 @@ namespace Microsoft.AspNet.Server.Kestrel return; } - // run the loop one more time to delete the _post handle + // run the loop one more time to delete the open handles _post.Reference(); _post.DangerousClose(); + _engine.Libuv.walk( + _loop, + (ptr, arg) => + { + var handle = UvMemory.FromIntPtr(ptr); + handle.Dispose(); + }, + IntPtr.Zero); var ran2 = _loop.Run(); - // delete the last of the unmanaged memory _loop.Dispose(); } catch (Exception ex) @@ -131,10 +149,26 @@ namespace Microsoft.AspNet.Server.Kestrel try { work.Callback(work.State); + if (work.Completion != null) + { + ThreadPool.QueueUserWorkItem( + tcs => + { + ((TaskCompletionSource)tcs).SetResult(0); + }, + work.Completion); + } } catch (Exception ex) { - //TODO: unhandled exceptions + if (work.Completion != null) + { + ThreadPool.QueueUserWorkItem(_ => work.Completion.SetException(ex), null); + } + else + { + // TODO: unobserved exception? + } } } } @@ -143,6 +177,7 @@ namespace Microsoft.AspNet.Server.Kestrel { public Action Callback; public object State; + public TaskCompletionSource Completion; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelTrace.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelTrace.cs index 907567bb82..6aa018b6bf 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelTrace.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelTrace.cs @@ -1,83 +1,83 @@ using System; -using System.Diagnostics.Tracing; +//using System.Diagnostics.Tracing; namespace Microsoft.AspNet.Server.Kestrel { /// /// Summary description for KestrelTrace /// - public class KestrelTrace : EventSource + public class KestrelTrace //: EventSource { public static KestrelTrace Log = new KestrelTrace(); - static EventTask Connection = (EventTask)1; - static EventTask Frame = (EventTask)1; + // static EventTask Connection = (EventTask)1; + // static EventTask Frame = (EventTask)1; - [Event(13, Level = EventLevel.Informational, Message = "Id {0}")] + // [Event(13, Level = EventLevel.Informational, Message = "Id {0}")] public void ConnectionStart(long connectionId) { - WriteEvent(13, connectionId); + // WriteEvent(13, connectionId); } - [Event(14, Level = EventLevel.Informational, Message = "Id {0}")] + // [Event(14, Level = EventLevel.Informational, Message = "Id {0}")] public void ConnectionStop(long connectionId) { - WriteEvent(14, connectionId); + // WriteEvent(14, connectionId); } - [Event(4, Message = "Id {0} Status {1}")] + // [Event(4, Message = "Id {0} Status {1}")] internal void ConnectionRead(long connectionId, int status) { - WriteEvent(4, connectionId, status); + // WriteEvent(4, connectionId, status); } - [Event(5, Message = "Id {0}")] + // [Event(5, Message = "Id {0}")] internal void ConnectionPause(long connectionId) { - WriteEvent(5, connectionId); + // WriteEvent(5, connectionId); } - [Event(6, Message = "Id {0}")] + // [Event(6, Message = "Id {0}")] internal void ConnectionResume(long connectionId) { - WriteEvent(6, connectionId); + // WriteEvent(6, connectionId); } - [Event(7, Message = "Id {0}")] + // [Event(7, Message = "Id {0}")] internal void ConnectionReadFin(long connectionId) { - WriteEvent(7, connectionId); + // WriteEvent(7, connectionId); } - [Event(8, Message = "Id {0} Step {1}")] +// [Event(8, Message = "Id {0} Step {1}")] internal void ConnectionWriteFin(long connectionId, int step) { - WriteEvent(8, connectionId, step); + // WriteEvent(8, connectionId, step); } - [Event(9, Message = "Id {0}")] + // [Event(9, Message = "Id {0}")] internal void ConnectionKeepAlive(long connectionId) { - WriteEvent(9, connectionId); + // WriteEvent(9, connectionId); } - [Event(10, Message = "Id {0}")] + // [Event(10, Message = "Id {0}")] internal void ConnectionDisconnect(long connectionId) { - WriteEvent(10, connectionId); + // WriteEvent(10, connectionId); } - [Event(11, Message = "Id {0} Count {1}")] + // [Event(11, Message = "Id {0} Count {1}")] internal void ConnectionWrite(long connectionId, int count) { - WriteEvent(11, connectionId, count); + // WriteEvent(11, connectionId, count); } - [Event(12, Message = "Id {0} Status {1}")] + // [Event(12, Message = "Id {0} Status {1}")] internal void ConnectionWriteCallback(long connectionId, int status) { - WriteEvent(12, connectionId, status); + // WriteEvent(12, connectionId, status); } } } diff --git a/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs b/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs index 8873de5041..2e821c1af2 100644 --- a/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs +++ b/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs @@ -35,12 +35,23 @@ namespace Microsoft.AspNet.Server.KestralTests { get { - var services = CallContextServiceLocator.Locator.ServiceProvider; - return (ILibraryManager)services.GetService(typeof(ILibraryManager)); + try + { + var locator = CallContextServiceLocator.Locator; + if (locator == null) + { + return null; + } + var services = locator.ServiceProvider; + if (services == null) + { + return null; + } + return (ILibraryManager)services.GetService(typeof(ILibraryManager)); + } catch (NullReferenceException) { return null; } } } - private async Task AppChunked(Frame frame) { var data = new MemoryStream(); @@ -85,6 +96,7 @@ namespace Microsoft.AspNet.Server.KestralTests engine.Start(1); var started = engine.CreateServer("http", "localhost", 54321, App); + Console.WriteLine("Started"); var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socket.Connect(new IPEndPoint(IPAddress.Loopback, 54321)); socket.Send(Encoding.ASCII.GetBytes("POST / HTTP/1.0\r\n\r\nHello World")); diff --git a/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs b/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs index f2994a1e85..19b0178c14 100644 --- a/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs +++ b/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs @@ -30,7 +30,16 @@ namespace Microsoft.AspNet.Server.KestralTests { get { - var services = CallContextServiceLocator.Locator.ServiceProvider; + var locator = CallContextServiceLocator.Locator; + if (locator == null) + { + return null; + } + var services = locator.ServiceProvider; + if (services == null) + { + return null; + } return (ILibraryManager)services.GetService(typeof(ILibraryManager)); } } @@ -123,12 +132,13 @@ namespace Microsoft.AspNet.Server.KestralTests tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321)); tcp.Listen(10, (_, status, state) => { + Console.WriteLine("Connected"); var tcp2 = new UvTcpHandle(); tcp2.Init(loop); tcp.Accept(tcp2); var data = Marshal.AllocCoTaskMem(500); tcp2.ReadStart( - (a, b, c) => new Libuv.uv_buf_t { memory = data, len = 500 }, + (a, b, c) => _uv.buf_init(data, 500), (__, nread, state2) => { bytesRead += nread; @@ -140,6 +150,7 @@ namespace Microsoft.AspNet.Server.KestralTests null); tcp.Dispose(); }, null); + Console.WriteLine("Task.Run"); var t = Task.Run(async () => { var socket = new Socket( @@ -165,5 +176,92 @@ namespace Microsoft.AspNet.Server.KestralTests loop.Dispose(); await t; } + + [Fact] + public async Task SocketCanReadAndWrite() + { + int bytesRead = 0; + var loop = new UvLoopHandle(); + loop.Init(_uv); + var tcp = new UvTcpHandle(); + tcp.Init(loop); + tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321)); + tcp.Listen(10, (_, status, state) => + { + Console.WriteLine("Connected"); + var tcp2 = new UvTcpHandle(); + tcp2.Init(loop); + tcp.Accept(tcp2); + var data = Marshal.AllocCoTaskMem(500); + tcp2.ReadStart( + (a, b, c) => tcp2.Libuv.buf_init(data, 500), + (__, nread, state2) => + { + bytesRead += nread; + if (nread == 0) + { + tcp2.Dispose(); + } + else + { + for (var x = 0; x != 2; ++x) + { + var req = new UvWriteReq(); + req.Init(loop); + req.Write( + tcp2, + new ArraySegment>( + new[]{new ArraySegment(new byte[]{65,66,67,68,69})} + ), + (___,____,_____)=>{}, + null); + } + } + }, + null); + tcp.Dispose(); + }, null); + Console.WriteLine("Task.Run"); + var t = Task.Run(async () => + { + var socket = new Socket( + AddressFamily.InterNetwork, + SocketType.Stream, + ProtocolType.Tcp); + await Task.Factory.FromAsync( + socket.BeginConnect, + socket.EndConnect, + new IPEndPoint(IPAddress.Loopback, 54321), + null, + TaskCreationOptions.None); + await Task.Factory.FromAsync( + socket.BeginSend, + socket.EndSend, + new[] { new ArraySegment(new byte[] { 1, 2, 3, 4, 5 }) }, + SocketFlags.None, + null, + TaskCreationOptions.None); + socket.Shutdown(SocketShutdown.Send); + var buffer = new ArraySegment(new byte[2048]); + for(;;) + { + var count = await Task.Factory.FromAsync( + socket.BeginReceive, + socket.EndReceive, + new[] { buffer }, + SocketFlags.None, + null, + TaskCreationOptions.None); + Console.WriteLine("count {0} {1}", + count, + System.Text.Encoding.ASCII.GetString(buffer.Array, 0, count)); + if (count <= 0) break; + } + socket.Dispose(); + }); + loop.Run(); + loop.Dispose(); + await t; + } } } \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestralTests/TestServer.cs b/test/Microsoft.AspNet.Server.KestralTests/TestServer.cs index e947f6b866..828c60ce67 100644 --- a/test/Microsoft.AspNet.Server.KestralTests/TestServer.cs +++ b/test/Microsoft.AspNet.Server.KestralTests/TestServer.cs @@ -24,8 +24,19 @@ namespace Microsoft.AspNet.Server.KestralTests { get { - var services = CallContextServiceLocator.Locator.ServiceProvider; - return (ILibraryManager)services.GetService(typeof(ILibraryManager)); + try{ + var locator = CallContextServiceLocator.Locator; + if (locator == null) + { + return null; + } + var services = locator.ServiceProvider; + if (services == null) + { + return null; + } + return (ILibraryManager)services.GetService(typeof(ILibraryManager)); + } catch (NullReferenceException) { return null; } } }