Using named pipes to dispatch connections to multiple threads

This commit is contained in:
Louis DeJardin 2015-07-24 13:49:11 -07:00
parent ceeb4edabd
commit 42246fd51b
12 changed files with 561 additions and 15 deletions

View File

@ -38,7 +38,7 @@ namespace Kestrel
var disposables = new List<IDisposable>();
var information = (ServerInformation)serverInformation;
var engine = new KestrelEngine(_libraryManager, _appShutdownService);
engine.Start(1);
engine.Start(Environment.ProcessorCount);
foreach (var address in information.Addresses)
{
disposables.Add(engine.CreateServer(

View File

@ -53,7 +53,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
ListenSocket = new UvTcpHandle();
ListenSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
ListenSocket.Bind(new IPEndPoint(IPAddress.Any, port));
ListenSocket.Listen(10, _connectionCallback, this);
ListenSocket.Listen(128, _connectionCallback, this);
tcs.SetResult(0);
}
catch (Exception ex)
@ -64,13 +64,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return tcs.Task;
}
private void OnConnection(UvStreamHandle listenSocket, int status)
protected void OnConnection(UvStreamHandle listenSocket, int status)
{
var acceptSocket = new UvTcpHandle();
acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
listenSocket.Accept(acceptSocket);
var connection = new Connection(this, acceptSocket);
DispatchConnection(acceptSocket);
}
protected virtual void DispatchConnection(UvTcpHandle socket)
{
var connection = new Connection(this, socket);
connection.Start();
}

View File

@ -0,0 +1,88 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class ListenerPrimary : Listener
{
UvPipeHandle ListenPipe { get; set; }
List<UvPipeHandle> _dispatchPipes = new List<UvPipeHandle>();
int _dispatchIndex;
ArraySegment<ArraySegment<byte>> _1234 = new ArraySegment<ArraySegment<byte>>(new[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }) });
public ListenerPrimary(IMemoryPool memory) : base(memory)
{
}
public async Task StartAsync(
string pipeName,
string scheme,
string host,
int port,
KestrelThread thread,
Func<Frame, Task> application)
{
await StartAsync(scheme, host, port, thread, application);
await Thread.PostAsync(_ =>
{
ListenPipe = new UvPipeHandle();
ListenPipe.Init(Thread.Loop, false);
ListenPipe.Bind(pipeName);
ListenPipe.Listen(128, OnListenPipe, null);
}, null);
}
private void OnListenPipe(UvStreamHandle pipe, int status, Exception error, object state)
{
if (status < 0)
{
return;
}
var dispatchPipe = new UvPipeHandle();
dispatchPipe.Init(Thread.Loop, true);
try
{
pipe.Accept(dispatchPipe);
}
catch (Exception)
{
dispatchPipe.Dispose();
return;
}
_dispatchPipes.Add(dispatchPipe);
}
protected override void DispatchConnection(UvTcpHandle socket)
{
var index = _dispatchIndex++ % (_dispatchPipes.Count + 1);
if (index == _dispatchPipes.Count)
{
base.DispatchConnection(socket);
}
else
{
var dispatchPipe = _dispatchPipes[index];
var write = new UvWriteReq();
write.Init(Thread.Loop);
write.Write2(
dispatchPipe,
_1234,
socket,
(write2, status, error, state) =>
{
write.Dispose();
((UvTcpHandle)state).Dispose();
},
socket);
}
}
}
}

View File

@ -0,0 +1,106 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class ListenerSecondary : ListenerContext, IDisposable
{
UvPipeHandle DispatchPipe { get; set; }
public ListenerSecondary(IMemoryPool memory)
{
Memory = memory;
}
public Task StartAsync(
string pipeName,
KestrelThread thread,
Func<Frame, Task> application)
{
Thread = thread;
Application = application;
DispatchPipe = new UvPipeHandle();
var tcs = new TaskCompletionSource<int>();
Thread.Post(_ =>
{
try
{
DispatchPipe.Init(Thread.Loop, true);
var connect = new UvConnectRequest();
connect.Init(Thread.Loop);
connect.Connect(
DispatchPipe,
pipeName,
(connect2, status, error, state) =>
{
connect.Dispose();
if (error != null)
{
tcs.SetException(error);
return;
}
try
{
var ptr = Marshal.AllocHGlobal(16);
var buf = Thread.Loop.Libuv.buf_init(ptr, 16);
DispatchPipe.ReadStart(
(_1, _2, _3) => buf,
(_1, status2, error2, state2) =>
{
if (status2 <= 0)
{
DispatchPipe.Dispose();
Marshal.FreeHGlobal(ptr);
return;
}
var acceptSocket = new UvTcpHandle();
acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
DispatchPipe.Accept(acceptSocket);
var connection = new Connection(this, acceptSocket);
connection.Start();
},
null);
tcs.SetResult(0);
}
catch (Exception ex)
{
DispatchPipe.Dispose();
tcs.SetException(ex);
}
},
null);
}
catch (Exception ex)
{
DispatchPipe.Dispose();
tcs.SetException(ex);
}
}, null);
return tcs.Task;
}
public void Dispose()
{
// Ensure the event loop is still running.
// If the event loop isn't running and we try to wait on this Post
// to complete, then KestrelEngine will never be disposed and
// the exception that stopped the event loop will never be surfaced.
if (Thread.FatalError == null)
{
Thread.Send(_ => DispatchPipe.Dispose(), null);
}
}
}
}

View File

@ -118,6 +118,18 @@ namespace Microsoft.AspNet.Server.Kestrel
return tcs.Task;
}
public void Send(Action<object> callback, object state)
{
if (_loop.ThreadId == Thread.CurrentThread.ManagedThreadId)
{
callback.Invoke(state);
}
else
{
PostAsync(callback, state).Wait();
}
}
private void PostCloseHandle(Action<IntPtr> callback, IntPtr handle)
{
lock (_workSync)

View File

@ -38,10 +38,10 @@ namespace Microsoft.AspNet.Server.Kestrel
: "amd64";
libraryPath = Path.Combine(
libraryPath,
libraryPath,
"native",
"windows",
architecture,
architecture,
"libuv.dll");
}
else if (Libuv.IsDarwin)
@ -91,16 +91,37 @@ namespace Microsoft.AspNet.Server.Kestrel
public IDisposable CreateServer(string scheme, string host, int port, Func<Frame, Task> application)
{
var listeners = new List<Listener>();
var listeners = new List<IDisposable>();
try
{
var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var single = Threads.Count == 1;
var first = true;
foreach (var thread in Threads)
{
var listener = new Listener(Memory);
if (single)
{
var listener = new Listener(Memory);
listeners.Add(listener);
listener.StartAsync(scheme, host, port, thread, application).Wait();
}
else if (first)
{
var listener = new ListenerPrimary(Memory);
listeners.Add(listener);
listener.StartAsync(pipeName, scheme, host, port, thread, application).Wait();
}
else
{
var listener = new ListenerSecondary(Memory);
listeners.Add(listener);
listener.StartAsync(pipeName, thread, application).Wait();
}
listeners.Add(listener);
listener.StartAsync(scheme, host, port, thread, application).Wait();
first = false;
}
return new Disposable(() =>
{

View File

@ -46,7 +46,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
_listenCallback = callback;
_listenState = state;
_listenVitality = GCHandle.Alloc(this, GCHandleType.Normal);
_uv.listen(this, 10, _uv_connection_cb);
_uv.listen(this, backlog, _uv_connection_cb);
}
catch
{

View File

@ -81,6 +81,54 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
}
}
public unsafe void Write2(
UvStreamHandle handle,
ArraySegment<ArraySegment<byte>> bufs,
UvStreamHandle sendHandle,
Action<UvWriteReq, int, Exception, object> callback,
object state)
{
try
{
// add GCHandle to keeps this SafeHandle alive while request processing
_pins.Add(GCHandle.Alloc(this, GCHandleType.Normal));
var pBuffers = (Libuv.uv_buf_t*)_bufs;
var nBuffers = bufs.Count;
if (nBuffers > BUFFER_COUNT)
{
// create and pin buffer array when it's larger than the pre-allocated one
var bufArray = new Libuv.uv_buf_t[nBuffers];
var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned);
_pins.Add(gcHandle);
pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject();
}
for (var index = 0; index != nBuffers; ++index)
{
// create and pin each segment being written
var buf = bufs.Array[bufs.Offset + index];
var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned);
_pins.Add(gcHandle);
pBuffers[index] = Libuv.buf_init(
gcHandle.AddrOfPinnedObject() + buf.Offset,
buf.Count);
}
_callback = callback;
_state = state;
_uv.write2(this, handle, pBuffers, nBuffers, sendHandle, _uv_write_cb);
}
catch
{
_callback = null;
_state = null;
Unpin(this);
throw;
}
}
private static void Unpin(UvWriteReq req)
{
foreach (var pin in req._pins)

View File

@ -6,7 +6,7 @@
"url": "git://github.com/aspnet/kestrelhttpserver"
},
"dependencies": {
"Microsoft.Framework.Runtime.Abstractions": "1.0.0-*"
"Microsoft.Framework.Runtime.Abstractions": "1.0.0-beta7-*"
},
"frameworks": {
"dnx451": { },

View File

@ -0,0 +1,251 @@
using Microsoft.AspNet.Server.Kestrel;
using Microsoft.AspNet.Server.Kestrel.Networking;
using Microsoft.Framework.Runtime;
using Microsoft.Framework.Runtime.Infrastructure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestrelTests
{
public class MultipleLoopTests
{
Libuv _uv;
public MultipleLoopTests()
{
var engine = new KestrelEngine(LibraryManager, new ShutdownNotImplemented());
_uv = engine.Libuv;
}
ILibraryManager LibraryManager
{
get
{
var locator = CallContextServiceLocator.Locator;
if (locator == null)
{
return null;
}
var services = locator.ServiceProvider;
if (services == null)
{
return null;
}
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
}
}
[Fact]
public void InitAndCloseServerPipe()
{
var loop = new UvLoopHandle();
var pipe = new UvPipeHandle();
loop.Init(_uv);
pipe.Init(loop, true);
pipe.Bind(@"\\.\pipe\InitAndCloseServerPipe");
pipe.Dispose();
loop.Run();
pipe.Dispose();
loop.Dispose();
}
[Fact]
public void ServerPipeListenForConnections()
{
var loop = new UvLoopHandle();
var serverListenPipe = new UvPipeHandle();
loop.Init(_uv);
serverListenPipe.Init(loop, false);
serverListenPipe.Bind(@"\\.\pipe\ServerPipeListenForConnections");
serverListenPipe.Listen(128, (_1, status, error, _2) =>
{
var serverConnectionPipe = new UvPipeHandle();
serverConnectionPipe.Init(loop, true);
try
{
serverListenPipe.Accept(serverConnectionPipe);
}
catch (Exception)
{
serverConnectionPipe.Dispose();
return;
}
var writeRequest = new UvWriteReq();
writeRequest.Init(loop);
writeRequest.Write(
serverConnectionPipe,
new ArraySegment<ArraySegment<byte>>(new ArraySegment<byte>[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }) }),
(_3, status2, error2, _4) =>
{
writeRequest.Dispose();
serverConnectionPipe.Dispose();
serverListenPipe.Dispose();
},
null);
}, null);
var worker = new Thread(() =>
{
var loop2 = new UvLoopHandle();
var clientConnectionPipe = new UvPipeHandle();
var connect = new UvConnectRequest();
loop2.Init(_uv);
clientConnectionPipe.Init(loop2, true);
connect.Init(loop2);
connect.Connect(clientConnectionPipe, @"\\.\pipe\ServerPipeListenForConnections", (_1, status, error, _2) =>
{
var buf = loop2.Libuv.buf_init(Marshal.AllocHGlobal(8192), 8192);
connect.Dispose();
clientConnectionPipe.ReadStart(
(_3, cb, _4) => buf,
(_3, status2, error2, _4) =>
{
if (status2 <= 0)
{
clientConnectionPipe.Dispose();
}
},
null);
}, null);
loop2.Run();
loop2.Dispose();
});
worker.Start();
loop.Run();
loop.Dispose();
worker.Join();
}
[Fact]
public void ServerPipeDispatchConnections()
{
var loop = new UvLoopHandle();
loop.Init(_uv);
var serverConnectionPipe = default(UvPipeHandle);
var serverListenPipe = new UvPipeHandle();
serverListenPipe.Init(loop, false);
serverListenPipe.Bind(@"\\.\pipe\ServerPipeListenForConnections");
serverListenPipe.Listen(128, (_1, status, error, _2) =>
{
serverConnectionPipe = new UvPipeHandle();
serverConnectionPipe.Init(loop, true);
try
{
serverListenPipe.Accept(serverConnectionPipe);
}
catch (Exception)
{
serverConnectionPipe.Dispose();
serverConnectionPipe = null;
}
}, null);
var serverListenTcp = new UvTcpHandle();
serverListenTcp.Init(loop);
serverListenTcp.Bind(new IPEndPoint(0, 54321));
serverListenTcp.Listen(128, (_1, status, error, _2) =>
{
var serverConnectionTcp = new UvTcpHandle();
serverConnectionTcp.Init(loop);
serverListenTcp.Accept(serverConnectionTcp);
var writeRequest = new UvWriteReq();
writeRequest.Init(loop);
writeRequest.Write2(
serverConnectionPipe,
new ArraySegment<ArraySegment<byte>>(new ArraySegment<byte>[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }) }),
serverConnectionTcp,
(_3, status2, error2, _4) =>
{
writeRequest.Dispose();
serverConnectionPipe.Dispose();
serverConnectionTcp.Dispose();
serverListenPipe.Dispose();
serverListenTcp.Dispose();
},
null);
}, null);
var worker = new Thread(() =>
{
var loop2 = new UvLoopHandle();
var clientConnectionPipe = new UvPipeHandle();
var connect = new UvConnectRequest();
loop2.Init(_uv);
clientConnectionPipe.Init(loop2, true);
connect.Init(loop2);
connect.Connect(clientConnectionPipe, @"\\.\pipe\ServerPipeListenForConnections", (_1, status, error, _2) =>
{
connect.Dispose();
var buf = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64);
clientConnectionPipe.ReadStart(
(_3, cb, _4) => buf,
(_3, status2, error2, _4) =>
{
if (status2 <= 0)
{
clientConnectionPipe.Dispose();
return;
}
var clientConnectionTcp = new UvTcpHandle();
clientConnectionTcp.Init(loop2);
clientConnectionPipe.Accept(clientConnectionTcp);
var buf2 = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64);
clientConnectionTcp.ReadStart(
(_5, cb, _6) => buf2,
(_5, status3, error3, _6) =>
{
if (status3 <= 0)
{
clientConnectionTcp.Dispose();
}
},
null);
},
null);
}, null);
loop2.Run();
loop2.Dispose();
});
var worker2 = new Thread(() =>
{
var socket = new Socket(SocketType.Stream, ProtocolType.IP);
socket.Connect(IPAddress.Loopback, 54321);
socket.Send(new byte[] { 6, 7, 8, 9 });
socket.Shutdown(SocketShutdown.Send);
var cb = socket.Receive(new byte[64]);
socket.Dispose();
});
worker.Start();
worker2.Start();
loop.Run();
loop.Dispose();
worker.Join();
worker2.Join();
}
}
}

View File

@ -1,6 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.Framework.Runtime;
using System;
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
@ -8,9 +11,21 @@ namespace Microsoft.AspNet.Server.KestrelTests
/// </summary>
public class Program
{
public void Main()
private readonly IApplicationEnvironment env;
private readonly IServiceProvider sp;
public Program(IApplicationEnvironment env, IServiceProvider sp)
{
new EngineTests().DisconnectingClient().Wait();
this.env = env;
this.sp = sp;
}
public int Main()
{
return new Xunit.Runner.AspNet.Program(env, sp).Main(new string[] {
"-class",
typeof(MultipleLoopTests).FullName
});
}
}
}

View File

@ -18,7 +18,7 @@
"allowUnsafe": true
},
"commands": {
"run": "xunit.runner.aspnet",
"run": "Microsoft.AspNet.Server.KestrelTests",
"test": "xunit.runner.aspnet"
}
}