diff --git a/src/Kestrel/ServerFactory.cs b/src/Kestrel/ServerFactory.cs index 1522c61df8..ac60b4c4f2 100644 --- a/src/Kestrel/ServerFactory.cs +++ b/src/Kestrel/ServerFactory.cs @@ -38,7 +38,7 @@ namespace Kestrel var disposables = new List(); var information = (ServerInformation)serverInformation; var engine = new KestrelEngine(_libraryManager, _appShutdownService); - engine.Start(1); + engine.Start(Environment.ProcessorCount); foreach (var address in information.Addresses) { disposables.Add(engine.CreateServer( diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs index c48e43b18a..64fca769dc 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -53,7 +53,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http ListenSocket = new UvTcpHandle(); ListenSocket.Init(Thread.Loop, Thread.QueueCloseHandle); ListenSocket.Bind(new IPEndPoint(IPAddress.Any, port)); - ListenSocket.Listen(10, _connectionCallback, this); + ListenSocket.Listen(128, _connectionCallback, this); tcs.SetResult(0); } catch (Exception ex) @@ -64,13 +64,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http return tcs.Task; } - private void OnConnection(UvStreamHandle listenSocket, int status) + protected void OnConnection(UvStreamHandle listenSocket, int status) { var acceptSocket = new UvTcpHandle(); acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle); listenSocket.Accept(acceptSocket); - var connection = new Connection(this, acceptSocket); + DispatchConnection(acceptSocket); + } + + protected virtual void DispatchConnection(UvTcpHandle socket) + { + var connection = new Connection(this, socket); connection.Start(); } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs new file mode 100644 index 0000000000..2de5c44ce5 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs @@ -0,0 +1,88 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.Server.Kestrel.Networking; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public class ListenerPrimary : Listener + { + UvPipeHandle ListenPipe { get; set; } + + List _dispatchPipes = new List(); + int _dispatchIndex; + ArraySegment> _1234 = new ArraySegment>(new[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }); + + public ListenerPrimary(IMemoryPool memory) : base(memory) + { + } + + public async Task StartAsync( + string pipeName, + string scheme, + string host, + int port, + KestrelThread thread, + Func application) + { + await StartAsync(scheme, host, port, thread, application); + + await Thread.PostAsync(_ => + { + ListenPipe = new UvPipeHandle(); + ListenPipe.Init(Thread.Loop, false); + ListenPipe.Bind(pipeName); + ListenPipe.Listen(128, OnListenPipe, null); + }, null); + } + + private void OnListenPipe(UvStreamHandle pipe, int status, Exception error, object state) + { + if (status < 0) + { + return; + } + + var dispatchPipe = new UvPipeHandle(); + dispatchPipe.Init(Thread.Loop, true); + try + { + pipe.Accept(dispatchPipe); + } + catch (Exception) + { + dispatchPipe.Dispose(); + return; + } + _dispatchPipes.Add(dispatchPipe); + } + + protected override void DispatchConnection(UvTcpHandle socket) + { + var index = _dispatchIndex++ % (_dispatchPipes.Count + 1); + if (index == _dispatchPipes.Count) + { + base.DispatchConnection(socket); + } + else + { + var dispatchPipe = _dispatchPipes[index]; + var write = new UvWriteReq(); + write.Init(Thread.Loop); + write.Write2( + dispatchPipe, + _1234, + socket, + (write2, status, error, state) => + { + write.Dispose(); + ((UvTcpHandle)state).Dispose(); + }, + socket); + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerSecondary.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerSecondary.cs new file mode 100644 index 0000000000..7f827f3452 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerSecondary.cs @@ -0,0 +1,106 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.Server.Kestrel.Networking; +using System; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public class ListenerSecondary : ListenerContext, IDisposable + { + UvPipeHandle DispatchPipe { get; set; } + + public ListenerSecondary(IMemoryPool memory) + { + Memory = memory; + } + + public Task StartAsync( + string pipeName, + KestrelThread thread, + Func application) + { + Thread = thread; + Application = application; + + DispatchPipe = new UvPipeHandle(); + + var tcs = new TaskCompletionSource(); + Thread.Post(_ => + { + try + { + DispatchPipe.Init(Thread.Loop, true); + var connect = new UvConnectRequest(); + connect.Init(Thread.Loop); + connect.Connect( + DispatchPipe, + pipeName, + (connect2, status, error, state) => + { + connect.Dispose(); + if (error != null) + { + tcs.SetException(error); + return; + } + + try + { + var ptr = Marshal.AllocHGlobal(16); + var buf = Thread.Loop.Libuv.buf_init(ptr, 16); + + DispatchPipe.ReadStart( + (_1, _2, _3) => buf, + (_1, status2, error2, state2) => + { + if (status2 <= 0) + { + DispatchPipe.Dispose(); + Marshal.FreeHGlobal(ptr); + return; + } + + var acceptSocket = new UvTcpHandle(); + acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle); + DispatchPipe.Accept(acceptSocket); + + var connection = new Connection(this, acceptSocket); + connection.Start(); + }, + null); + + tcs.SetResult(0); + } + catch (Exception ex) + { + DispatchPipe.Dispose(); + tcs.SetException(ex); + } + }, + null); + } + catch (Exception ex) + { + DispatchPipe.Dispose(); + tcs.SetException(ex); + } + }, null); + return tcs.Task; + } + + public void Dispose() + { + // Ensure the event loop is still running. + // If the event loop isn't running and we try to wait on this Post + // to complete, then KestrelEngine will never be disposed and + // the exception that stopped the event loop will never be surfaced. + if (Thread.FatalError == null) + { + Thread.Send(_ => DispatchPipe.Dispose(), null); + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index c971eca102..b2a2348c55 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -118,6 +118,18 @@ namespace Microsoft.AspNet.Server.Kestrel return tcs.Task; } + public void Send(Action callback, object state) + { + if (_loop.ThreadId == Thread.CurrentThread.ManagedThreadId) + { + callback.Invoke(state); + } + else + { + PostAsync(callback, state).Wait(); + } + } + private void PostCloseHandle(Action callback, IntPtr handle) { lock (_workSync) diff --git a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs index 394d3a745c..45513365a0 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs @@ -38,10 +38,10 @@ namespace Microsoft.AspNet.Server.Kestrel : "amd64"; libraryPath = Path.Combine( - libraryPath, + libraryPath, "native", "windows", - architecture, + architecture, "libuv.dll"); } else if (Libuv.IsDarwin) @@ -91,16 +91,37 @@ namespace Microsoft.AspNet.Server.Kestrel public IDisposable CreateServer(string scheme, string host, int port, Func application) { - var listeners = new List(); + var listeners = new List(); try { + var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); + + var single = Threads.Count == 1; + var first = true; + foreach (var thread in Threads) { - var listener = new Listener(Memory); + if (single) + { + var listener = new Listener(Memory); + listeners.Add(listener); + listener.StartAsync(scheme, host, port, thread, application).Wait(); + } + else if (first) + { + var listener = new ListenerPrimary(Memory); + listeners.Add(listener); + listener.StartAsync(pipeName, scheme, host, port, thread, application).Wait(); + } + else + { + var listener = new ListenerSecondary(Memory); + listeners.Add(listener); + listener.StartAsync(pipeName, thread, application).Wait(); + } - listeners.Add(listener); - listener.StartAsync(scheme, host, port, thread, application).Wait(); + first = false; } return new Disposable(() => { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs index 4ed7b4781a..33c0008549 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs @@ -46,7 +46,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking _listenCallback = callback; _listenState = state; _listenVitality = GCHandle.Alloc(this, GCHandleType.Normal); - _uv.listen(this, 10, _uv_connection_cb); + _uv.listen(this, backlog, _uv_connection_cb); } catch { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index a97175b7c3..03eee35cd7 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -81,6 +81,54 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking } } + public unsafe void Write2( + UvStreamHandle handle, + ArraySegment> bufs, + UvStreamHandle sendHandle, + Action callback, + object state) + { + try + { + // add GCHandle to keeps this SafeHandle alive while request processing + _pins.Add(GCHandle.Alloc(this, GCHandleType.Normal)); + + var pBuffers = (Libuv.uv_buf_t*)_bufs; + var nBuffers = bufs.Count; + if (nBuffers > BUFFER_COUNT) + { + // create and pin buffer array when it's larger than the pre-allocated one + var bufArray = new Libuv.uv_buf_t[nBuffers]; + var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned); + _pins.Add(gcHandle); + pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject(); + } + + for (var index = 0; index != nBuffers; ++index) + { + // create and pin each segment being written + var buf = bufs.Array[bufs.Offset + index]; + + var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned); + _pins.Add(gcHandle); + pBuffers[index] = Libuv.buf_init( + gcHandle.AddrOfPinnedObject() + buf.Offset, + buf.Count); + } + + _callback = callback; + _state = state; + _uv.write2(this, handle, pBuffers, nBuffers, sendHandle, _uv_write_cb); + } + catch + { + _callback = null; + _state = null; + Unpin(this); + throw; + } + } + private static void Unpin(UvWriteReq req) { foreach (var pin in req._pins) diff --git a/src/Microsoft.AspNet.Server.Kestrel/project.json b/src/Microsoft.AspNet.Server.Kestrel/project.json index 35f11ed90e..fb04e22a0b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/project.json +++ b/src/Microsoft.AspNet.Server.Kestrel/project.json @@ -6,7 +6,7 @@ "url": "git://github.com/aspnet/kestrelhttpserver" }, "dependencies": { - "Microsoft.Framework.Runtime.Abstractions": "1.0.0-*" + "Microsoft.Framework.Runtime.Abstractions": "1.0.0-beta7-*" }, "frameworks": { "dnx451": { }, diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs new file mode 100644 index 0000000000..651800e54e --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs @@ -0,0 +1,251 @@ +using Microsoft.AspNet.Server.Kestrel; +using Microsoft.AspNet.Server.Kestrel.Networking; +using Microsoft.Framework.Runtime; +using Microsoft.Framework.Runtime.Infrastructure; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.AspNet.Server.KestrelTests +{ + public class MultipleLoopTests + { + Libuv _uv; + public MultipleLoopTests() + { + var engine = new KestrelEngine(LibraryManager, new ShutdownNotImplemented()); + _uv = engine.Libuv; + } + + ILibraryManager LibraryManager + { + get + { + var locator = CallContextServiceLocator.Locator; + if (locator == null) + { + return null; + } + var services = locator.ServiceProvider; + if (services == null) + { + return null; + } + return (ILibraryManager)services.GetService(typeof(ILibraryManager)); + } + } + + [Fact] + public void InitAndCloseServerPipe() + { + var loop = new UvLoopHandle(); + var pipe = new UvPipeHandle(); + + loop.Init(_uv); + pipe.Init(loop, true); + pipe.Bind(@"\\.\pipe\InitAndCloseServerPipe"); + pipe.Dispose(); + + loop.Run(); + + pipe.Dispose(); + loop.Dispose(); + + } + + [Fact] + public void ServerPipeListenForConnections() + { + var loop = new UvLoopHandle(); + var serverListenPipe = new UvPipeHandle(); + + loop.Init(_uv); + serverListenPipe.Init(loop, false); + serverListenPipe.Bind(@"\\.\pipe\ServerPipeListenForConnections"); + serverListenPipe.Listen(128, (_1, status, error, _2) => + { + var serverConnectionPipe = new UvPipeHandle(); + serverConnectionPipe.Init(loop, true); + try + { + serverListenPipe.Accept(serverConnectionPipe); + } + catch (Exception) + { + serverConnectionPipe.Dispose(); + return; + } + + var writeRequest = new UvWriteReq(); + writeRequest.Init(loop); + writeRequest.Write( + serverConnectionPipe, + new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), + (_3, status2, error2, _4) => + { + writeRequest.Dispose(); + serverConnectionPipe.Dispose(); + serverListenPipe.Dispose(); + }, + null); + + }, null); + + var worker = new Thread(() => + { + var loop2 = new UvLoopHandle(); + var clientConnectionPipe = new UvPipeHandle(); + var connect = new UvConnectRequest(); + + loop2.Init(_uv); + clientConnectionPipe.Init(loop2, true); + connect.Init(loop2); + connect.Connect(clientConnectionPipe, @"\\.\pipe\ServerPipeListenForConnections", (_1, status, error, _2) => + { + var buf = loop2.Libuv.buf_init(Marshal.AllocHGlobal(8192), 8192); + + connect.Dispose(); + clientConnectionPipe.ReadStart( + (_3, cb, _4) => buf, + (_3, status2, error2, _4) => + { + if (status2 <= 0) + { + clientConnectionPipe.Dispose(); + } + }, + null); + }, null); + loop2.Run(); + loop2.Dispose(); + }); + worker.Start(); + loop.Run(); + loop.Dispose(); + worker.Join(); + } + + + [Fact] + public void ServerPipeDispatchConnections() + { + + var loop = new UvLoopHandle(); + loop.Init(_uv); + + var serverConnectionPipe = default(UvPipeHandle); + + var serverListenPipe = new UvPipeHandle(); + serverListenPipe.Init(loop, false); + serverListenPipe.Bind(@"\\.\pipe\ServerPipeListenForConnections"); + serverListenPipe.Listen(128, (_1, status, error, _2) => + { + serverConnectionPipe = new UvPipeHandle(); + serverConnectionPipe.Init(loop, true); + try + { + serverListenPipe.Accept(serverConnectionPipe); + } + catch (Exception) + { + serverConnectionPipe.Dispose(); + serverConnectionPipe = null; + } + }, null); + + var serverListenTcp = new UvTcpHandle(); + serverListenTcp.Init(loop); + serverListenTcp.Bind(new IPEndPoint(0, 54321)); + serverListenTcp.Listen(128, (_1, status, error, _2) => + { + var serverConnectionTcp = new UvTcpHandle(); + serverConnectionTcp.Init(loop); + serverListenTcp.Accept(serverConnectionTcp); + + var writeRequest = new UvWriteReq(); + writeRequest.Init(loop); + writeRequest.Write2( + serverConnectionPipe, + new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), + serverConnectionTcp, + (_3, status2, error2, _4) => + { + writeRequest.Dispose(); + serverConnectionPipe.Dispose(); + serverConnectionTcp.Dispose(); + serverListenPipe.Dispose(); + serverListenTcp.Dispose(); + }, + null); + }, null); + + var worker = new Thread(() => + { + var loop2 = new UvLoopHandle(); + var clientConnectionPipe = new UvPipeHandle(); + var connect = new UvConnectRequest(); + + loop2.Init(_uv); + clientConnectionPipe.Init(loop2, true); + connect.Init(loop2); + connect.Connect(clientConnectionPipe, @"\\.\pipe\ServerPipeListenForConnections", (_1, status, error, _2) => + { + connect.Dispose(); + + var buf = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64); + clientConnectionPipe.ReadStart( + (_3, cb, _4) => buf, + (_3, status2, error2, _4) => + { + if (status2 <= 0) + { + clientConnectionPipe.Dispose(); + return; + } + var clientConnectionTcp = new UvTcpHandle(); + clientConnectionTcp.Init(loop2); + clientConnectionPipe.Accept(clientConnectionTcp); + var buf2 = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64); + clientConnectionTcp.ReadStart( + (_5, cb, _6) => buf2, + (_5, status3, error3, _6) => + { + if (status3 <= 0) + { + clientConnectionTcp.Dispose(); + } + }, + null); + }, + null); + }, null); + loop2.Run(); + loop2.Dispose(); + }); + + var worker2 = new Thread(() => + { + var socket = new Socket(SocketType.Stream, ProtocolType.IP); + socket.Connect(IPAddress.Loopback, 54321); + socket.Send(new byte[] { 6, 7, 8, 9 }); + socket.Shutdown(SocketShutdown.Send); + var cb = socket.Receive(new byte[64]); + socket.Dispose(); + }); + + worker.Start(); + worker2.Start(); + + loop.Run(); + loop.Dispose(); + worker.Join(); + worker2.Join(); + } + } +} diff --git a/test/Microsoft.AspNet.Server.KestrelTests/Program.cs b/test/Microsoft.AspNet.Server.KestrelTests/Program.cs index adcc2288c5..2cfa625390 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/Program.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/Program.cs @@ -1,6 +1,9 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using Microsoft.Framework.Runtime; +using System; + namespace Microsoft.AspNet.Server.KestrelTests { /// @@ -8,9 +11,21 @@ namespace Microsoft.AspNet.Server.KestrelTests /// public class Program { - public void Main() + private readonly IApplicationEnvironment env; + private readonly IServiceProvider sp; + + public Program(IApplicationEnvironment env, IServiceProvider sp) { - new EngineTests().DisconnectingClient().Wait(); + this.env = env; + this.sp = sp; + } + + public int Main() + { + return new Xunit.Runner.AspNet.Program(env, sp).Main(new string[] { + "-class", + typeof(MultipleLoopTests).FullName + }); } } } \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestrelTests/project.json b/test/Microsoft.AspNet.Server.KestrelTests/project.json index afb7e9ad47..68c1a69290 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/project.json +++ b/test/Microsoft.AspNet.Server.KestrelTests/project.json @@ -18,7 +18,7 @@ "allowUnsafe": true }, "commands": { - "run": "xunit.runner.aspnet", + "run": "Microsoft.AspNet.Server.KestrelTests", "test": "xunit.runner.aspnet" } }