Adjusting Kestrel to run cross platform

This commit is contained in:
Louis DeJardin 2014-06-27 20:01:44 -07:00
parent 9c29ccdd32
commit e4b9bd265c
8 changed files with 203 additions and 53 deletions

View File

@ -74,11 +74,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
return new Libuv.uv_buf_t
{
memory = SocketInput.Pin(2048),
len = 2048
};
return handle.Libuv.buf_init(
SocketInput.Pin(2048),
2048);
}
private void OnRead(UvStreamHandle handle, int nread)

View File

@ -62,7 +62,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
ListenSocket = new UvTcpHandle();
ListenSocket.Init(Thread.Loop);
ListenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, port));
ListenSocket.Bind(new IPEndPoint(IPAddress.Any, port));
ListenSocket.Listen(10, _connectionCallback, this);
tcs.SetResult(0);
}
@ -86,7 +86,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Dispose()
{
Console.WriteLine("Listener.Dispose");
var tcs = new TaskCompletionSource<int>();
Thread.Post(
_ =>

View File

@ -28,6 +28,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Write(ArraySegment<byte> buffer, Action<object> callback, object state)
{
//TODO: need buffering that works
var copy = new byte[buffer.Count];
Array.Copy(buffer.Array, buffer.Offset, copy, 0, buffer.Count);
buffer = new ArraySegment<byte>(copy);
KestrelTrace.Log.ConnectionWrite(0, buffer.Count);
var req = new ThisWriteReq();
req.Init(_thread.Loop);
@ -70,24 +75,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Write()
{
_pin = GCHandle.Alloc(_buffer.Array, GCHandleType.Pinned);
var buf = new Libuv.uv_buf_t
{
len = (uint)_buffer.Count,
memory = _pin.AddrOfPinnedObject() + _buffer.Offset
};
Write(
_socket,
new[] { buf },
1,
new ArraySegment<ArraySegment<byte>>(
new[]{_buffer}),
_writeCallback,
this);
}
private void OnWrite(UvWriteReq req, int status)
{
_pin.Free();
KestrelTrace.Log.ConnectionWriteCallback(0, status);
//NOTE: pool this?
Dispose();

View File

@ -81,6 +81,17 @@ namespace Microsoft.AspNet.Server.Kestrel
_post.Send();
}
public Task PostAsync(Action<object> callback, object state)
{
var tcs = new TaskCompletionSource<int>();
lock (_workSync)
{
_workAdding.Enqueue(new Work { Callback = callback, State = state, Completion = tcs });
}
_post.Send();
return tcs.Task;
}
private void ThreadStart(object parameter)
{
var tcs = (TaskCompletionSource<int>)parameter;
@ -103,12 +114,19 @@ namespace Microsoft.AspNet.Server.Kestrel
return;
}
// run the loop one more time to delete the _post handle
// run the loop one more time to delete the open handles
_post.Reference();
_post.DangerousClose();
_engine.Libuv.walk(
_loop,
(ptr, arg) =>
{
var handle = UvMemory.FromIntPtr<UvHandle>(ptr);
handle.Dispose();
},
IntPtr.Zero);
var ran2 = _loop.Run();
// delete the last of the unmanaged memory
_loop.Dispose();
}
catch (Exception ex)
@ -131,10 +149,26 @@ namespace Microsoft.AspNet.Server.Kestrel
try
{
work.Callback(work.State);
if (work.Completion != null)
{
ThreadPool.QueueUserWorkItem(
tcs =>
{
((TaskCompletionSource<int>)tcs).SetResult(0);
},
work.Completion);
}
}
catch (Exception ex)
{
//TODO: unhandled exceptions
if (work.Completion != null)
{
ThreadPool.QueueUserWorkItem(_ => work.Completion.SetException(ex), null);
}
else
{
// TODO: unobserved exception?
}
}
}
}
@ -143,6 +177,7 @@ namespace Microsoft.AspNet.Server.Kestrel
{
public Action<object> Callback;
public object State;
public TaskCompletionSource<int> Completion;
}
}
}

View File

@ -1,83 +1,83 @@
using System;
using System.Diagnostics.Tracing;
//using System.Diagnostics.Tracing;
namespace Microsoft.AspNet.Server.Kestrel
{
/// <summary>
/// Summary description for KestrelTrace
/// </summary>
public class KestrelTrace : EventSource
public class KestrelTrace //: EventSource
{
public static KestrelTrace Log = new KestrelTrace();
static EventTask Connection = (EventTask)1;
static EventTask Frame = (EventTask)1;
// static EventTask Connection = (EventTask)1;
// static EventTask Frame = (EventTask)1;
[Event(13, Level = EventLevel.Informational, Message = "Id {0}")]
// [Event(13, Level = EventLevel.Informational, Message = "Id {0}")]
public void ConnectionStart(long connectionId)
{
WriteEvent(13, connectionId);
// WriteEvent(13, connectionId);
}
[Event(14, Level = EventLevel.Informational, Message = "Id {0}")]
// [Event(14, Level = EventLevel.Informational, Message = "Id {0}")]
public void ConnectionStop(long connectionId)
{
WriteEvent(14, connectionId);
// WriteEvent(14, connectionId);
}
[Event(4, Message = "Id {0} Status {1}")]
// [Event(4, Message = "Id {0} Status {1}")]
internal void ConnectionRead(long connectionId, int status)
{
WriteEvent(4, connectionId, status);
// WriteEvent(4, connectionId, status);
}
[Event(5, Message = "Id {0}")]
// [Event(5, Message = "Id {0}")]
internal void ConnectionPause(long connectionId)
{
WriteEvent(5, connectionId);
// WriteEvent(5, connectionId);
}
[Event(6, Message = "Id {0}")]
// [Event(6, Message = "Id {0}")]
internal void ConnectionResume(long connectionId)
{
WriteEvent(6, connectionId);
// WriteEvent(6, connectionId);
}
[Event(7, Message = "Id {0}")]
// [Event(7, Message = "Id {0}")]
internal void ConnectionReadFin(long connectionId)
{
WriteEvent(7, connectionId);
// WriteEvent(7, connectionId);
}
[Event(8, Message = "Id {0} Step {1}")]
// [Event(8, Message = "Id {0} Step {1}")]
internal void ConnectionWriteFin(long connectionId, int step)
{
WriteEvent(8, connectionId, step);
// WriteEvent(8, connectionId, step);
}
[Event(9, Message = "Id {0}")]
// [Event(9, Message = "Id {0}")]
internal void ConnectionKeepAlive(long connectionId)
{
WriteEvent(9, connectionId);
// WriteEvent(9, connectionId);
}
[Event(10, Message = "Id {0}")]
// [Event(10, Message = "Id {0}")]
internal void ConnectionDisconnect(long connectionId)
{
WriteEvent(10, connectionId);
// WriteEvent(10, connectionId);
}
[Event(11, Message = "Id {0} Count {1}")]
// [Event(11, Message = "Id {0} Count {1}")]
internal void ConnectionWrite(long connectionId, int count)
{
WriteEvent(11, connectionId, count);
// WriteEvent(11, connectionId, count);
}
[Event(12, Message = "Id {0} Status {1}")]
// [Event(12, Message = "Id {0} Status {1}")]
internal void ConnectionWriteCallback(long connectionId, int status)
{
WriteEvent(12, connectionId, status);
// WriteEvent(12, connectionId, status);
}
}
}

View File

@ -35,12 +35,23 @@ namespace Microsoft.AspNet.Server.KestralTests
{
get
{
var services = CallContextServiceLocator.Locator.ServiceProvider;
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
try
{
var locator = CallContextServiceLocator.Locator;
if (locator == null)
{
return null;
}
var services = locator.ServiceProvider;
if (services == null)
{
return null;
}
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
} catch (NullReferenceException) { return null; }
}
}
private async Task AppChunked(Frame frame)
{
var data = new MemoryStream();
@ -85,6 +96,7 @@ namespace Microsoft.AspNet.Server.KestralTests
engine.Start(1);
var started = engine.CreateServer("http", "localhost", 54321, App);
Console.WriteLine("Started");
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect(new IPEndPoint(IPAddress.Loopback, 54321));
socket.Send(Encoding.ASCII.GetBytes("POST / HTTP/1.0\r\n\r\nHello World"));

View File

@ -30,7 +30,16 @@ namespace Microsoft.AspNet.Server.KestralTests
{
get
{
var services = CallContextServiceLocator.Locator.ServiceProvider;
var locator = CallContextServiceLocator.Locator;
if (locator == null)
{
return null;
}
var services = locator.ServiceProvider;
if (services == null)
{
return null;
}
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
}
}
@ -123,12 +132,13 @@ namespace Microsoft.AspNet.Server.KestralTests
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (_, status, state) =>
{
Console.WriteLine("Connected");
var tcp2 = new UvTcpHandle();
tcp2.Init(loop);
tcp.Accept(tcp2);
var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart(
(a, b, c) => new Libuv.uv_buf_t { memory = data, len = 500 },
(a, b, c) => _uv.buf_init(data, 500),
(__, nread, state2) =>
{
bytesRead += nread;
@ -140,6 +150,7 @@ namespace Microsoft.AspNet.Server.KestralTests
null);
tcp.Dispose();
}, null);
Console.WriteLine("Task.Run");
var t = Task.Run(async () =>
{
var socket = new Socket(
@ -165,5 +176,92 @@ namespace Microsoft.AspNet.Server.KestralTests
loop.Dispose();
await t;
}
[Fact]
public async Task SocketCanReadAndWrite()
{
int bytesRead = 0;
var loop = new UvLoopHandle();
loop.Init(_uv);
var tcp = new UvTcpHandle();
tcp.Init(loop);
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (_, status, state) =>
{
Console.WriteLine("Connected");
var tcp2 = new UvTcpHandle();
tcp2.Init(loop);
tcp.Accept(tcp2);
var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart(
(a, b, c) => tcp2.Libuv.buf_init(data, 500),
(__, nread, state2) =>
{
bytesRead += nread;
if (nread == 0)
{
tcp2.Dispose();
}
else
{
for (var x = 0; x != 2; ++x)
{
var req = new UvWriteReq();
req.Init(loop);
req.Write(
tcp2,
new ArraySegment<ArraySegment<byte>>(
new[]{new ArraySegment<byte>(new byte[]{65,66,67,68,69})}
),
(___,____,_____)=>{},
null);
}
}
},
null);
tcp.Dispose();
}, null);
Console.WriteLine("Task.Run");
var t = Task.Run(async () =>
{
var socket = new Socket(
AddressFamily.InterNetwork,
SocketType.Stream,
ProtocolType.Tcp);
await Task.Factory.FromAsync(
socket.BeginConnect,
socket.EndConnect,
new IPEndPoint(IPAddress.Loopback, 54321),
null,
TaskCreationOptions.None);
await Task.Factory.FromAsync(
socket.BeginSend,
socket.EndSend,
new[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4, 5 }) },
SocketFlags.None,
null,
TaskCreationOptions.None);
socket.Shutdown(SocketShutdown.Send);
var buffer = new ArraySegment<byte>(new byte[2048]);
for(;;)
{
var count = await Task.Factory.FromAsync(
socket.BeginReceive,
socket.EndReceive,
new[] { buffer },
SocketFlags.None,
null,
TaskCreationOptions.None);
Console.WriteLine("count {0} {1}",
count,
System.Text.Encoding.ASCII.GetString(buffer.Array, 0, count));
if (count <= 0) break;
}
socket.Dispose();
});
loop.Run();
loop.Dispose();
await t;
}
}
}

View File

@ -24,8 +24,19 @@ namespace Microsoft.AspNet.Server.KestralTests
{
get
{
var services = CallContextServiceLocator.Locator.ServiceProvider;
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
try{
var locator = CallContextServiceLocator.Locator;
if (locator == null)
{
return null;
}
var services = locator.ServiceProvider;
if (services == null)
{
return null;
}
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
} catch (NullReferenceException) { return null; }
}
}