Improving callback exception support

Also renaming KestralTests to KestrelTests
This commit is contained in:
Louis DeJardin 2014-07-07 13:02:18 -07:00
parent 6edd238f38
commit 8624b82b2b
17 changed files with 209 additions and 67 deletions

View File

@ -4,6 +4,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Networking;
using System.Diagnostics;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
@ -39,7 +40,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public class Connection : ConnectionContext, IConnectionControl
{
private static readonly Action<UvStreamHandle, int, object> _readCallback = ReadCallback;
private static readonly Action<UvStreamHandle, int, Exception, object> _readCallback = ReadCallback;
private static readonly Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback = AllocCallback;
private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state)
@ -47,9 +48,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return ((Connection)state).OnAlloc(handle, suggestedSize);
}
private static void ReadCallback(UvStreamHandle handle, int nread, object state)
private static void ReadCallback(UvStreamHandle handle, int nread, Exception error, object state)
{
((Connection)state).OnRead(handle, nread);
((Connection)state).OnRead(handle, nread, error);
}
private readonly UvStreamHandle _socket;
@ -79,14 +80,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
2048);
}
private void OnRead(UvStreamHandle handle, int nread)
private void OnRead(UvStreamHandle handle, int nread, Exception error)
{
SocketInput.Unpin(nread);
if (nread == 0)
if (nread == 0 || error != null)
{
SocketInput.RemoteIntakeFin = true;
KestrelTrace.Log.ConnectionReadFin(_connectionId);
if (error != null)
{
Trace.WriteLine("Connection.OnRead " + error.ToString());
}
}
else
{

View File

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
@ -38,7 +39,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public interface IFrameControl
{
void ProduceContinue();
void Write(ArraySegment<byte> data, Action<object> callback, object state);
void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state);
}
public class Frame : FrameContext, IFrameControl
@ -229,7 +230,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
public void Write(ArraySegment<byte> data, Action<object> callback, object state)
public void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state)
{
ProduceStart();
SocketOutput.Write(data, callback, state);
@ -255,7 +256,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
RequestHeaders.TryGetValue("Expect", out expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
SocketOutput.Write(new ArraySegment<byte>(_continueBytes, 0, _continueBytes.Length), _ => { }, null);
SocketOutput.Write(
new ArraySegment<byte>(_continueBytes, 0, _continueBytes.Length),
(error, _) =>
{
if (error != null)
{
Trace.WriteLine("ProduceContinue " + error.ToString());
}
},
null);
}
}
@ -269,7 +279,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase);
var responseHeader = CreateResponseHeader(status, ResponseHeaders);
SocketOutput.Write(responseHeader.Item1, x => ((IDisposable)x).Dispose(), responseHeader.Item2);
SocketOutput.Write(
responseHeader.Item1,
(error, x) =>
{
if (error != null)
{
Trace.WriteLine("ProduceStart " + error.ToString());
}
((IDisposable)x).Dispose();
},
responseHeader.Item2);
}
public void ProduceEnd(Exception ex)

View File

@ -25,7 +25,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override Task FlushAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_context.FrameControl.Write(new ArraySegment<byte>(new byte[0]), x => ((TaskCompletionSource<int>)x).SetResult(0), tcs);
_context.FrameControl.Write(
new ArraySegment<byte>(new byte[0]),
(error, arg) =>
{
var tcsArg = (TaskCompletionSource<int>)arg;
if (error != null)
{
tcsArg.SetException(error);
}
else
{
tcsArg.SetResult(0);
}
},
tcs);
return tcs.Task;
}
@ -52,7 +66,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_context.FrameControl.Write(new ArraySegment<byte>(buffer, offset, count), x => ((TaskCompletionSource<int>)x).SetResult(0), tcs);
_context.FrameControl.Write(
new ArraySegment<byte>(buffer, offset, count),
(error, arg) =>
{
var tcsArg = (TaskCompletionSource<int>)arg;
if (error != null)
{
tcsArg.SetException(error);
}
else
{
tcsArg.SetResult(0);
}
},
tcs);
return tcs.Task;
}

View File

@ -3,6 +3,7 @@
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Diagnostics;
using System.Net;
using System.Threading.Tasks;
@ -31,13 +32,20 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
/// </summary>
public class Listener : ListenerContext, IDisposable
{
private static readonly Action<UvStreamHandle, int, object> _connectionCallback = ConnectionCallback;
private static readonly Action<UvStreamHandle, int, Exception, object> _connectionCallback = ConnectionCallback;
UvTcpHandle ListenSocket { get; set; }
private static void ConnectionCallback(UvStreamHandle stream, int status, object state)
private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
((Listener)state).OnConnection(stream, status);
if (error != null)
{
Trace.WriteLine("Listener.ConnectionCallback " + error.ToString());
}
else
{
((Listener)state).OnConnection(stream, status);
}
}
public Listener(IMemoryPool memory)
@ -99,7 +107,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
tcs.SetException(ex);
}
},
},
null);
tcs.Task.Wait();
ListenSocket = null;

View File

@ -12,7 +12,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
/// </summary>
public interface ISocketOutput
{
void Write(ArraySegment<byte> buffer, Action<object> callback, object state);
void Write(ArraySegment<byte> buffer, Action<Exception, object> callback, object state);
}
public class SocketOutput : ISocketOutput
@ -26,7 +26,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_socket = socket;
}
public void Write(ArraySegment<byte> buffer, Action<object> callback, object state)
public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback, object state)
{
//TODO: need buffering that works
var copy = new byte[buffer.Count];
@ -45,17 +45,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public class ThisWriteReq : UvWriteReq
{
private static readonly Action<UvWriteReq, int, object> _writeCallback = WriteCallback;
private static void WriteCallback(UvWriteReq req, int status, object state)
private static readonly Action<UvWriteReq, int, Exception, object> _writeCallback = WriteCallback;
private static void WriteCallback(UvWriteReq req, int status, Exception error, object state)
{
((ThisWriteReq)state).OnWrite(req, status);
((ThisWriteReq)state).OnWrite(req, status, error);
}
SocketOutput _self;
ArraySegment<byte> _buffer;
Action<Exception> _drained;
UvStreamHandle _socket;
Action<object> _callback;
Action<Exception, object> _callback;
object _state;
GCHandle _pin;
@ -63,7 +63,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
SocketOutput socketOutput,
UvStreamHandle socket,
ArraySegment<byte> buffer,
Action<object> callback,
Action<Exception, object> callback,
object state)
{
_self = socketOutput;
@ -83,12 +83,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
this);
}
private void OnWrite(UvWriteReq req, int status)
private void OnWrite(UvWriteReq req, int status, Exception error)
{
KestrelTrace.Log.ConnectionWriteCallback(0, status);
//NOTE: pool this?
Dispose();
_callback(_state);
_callback(error, _state);
}
}

View File

@ -105,6 +105,7 @@ namespace Microsoft.AspNet.Server.Kestrel
{
tcs.SetException(ex);
}
try
{
var ran1 = _loop.Run();

View File

@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
namespace Microsoft.AspNet.Server.Kestrel.Networking
{
@ -11,16 +12,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
private readonly static Libuv.uv_alloc_cb _uv_alloc_cb = UvAllocCb;
private readonly static Libuv.uv_read_cb _uv_read_cb = UvReadCb;
public Action<UvStreamHandle, int, object> _connectionCallback;
public Action<UvStreamHandle, int, Exception, object> _connectionCallback;
public object _connectionState;
public Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback;
public Action<UvStreamHandle, int, object> _readCallback;
public Action<UvStreamHandle, int, Exception, object> _readCallback;
public object _readState;
public void Listen(int backlog, Action<UvStreamHandle, int, object> callback, object state)
public void Listen(int backlog, Action<UvStreamHandle, int, Exception, object> callback, object state)
{
_connectionCallback = callback;
_connectionState = state;
@ -34,7 +35,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
public void ReadStart(
Func<UvStreamHandle, int, object, Libuv.uv_buf_t> allocCallback,
Action<UvStreamHandle, int, object> readCallback,
Action<UvStreamHandle, int, Exception, object> readCallback,
object state)
{
_allocCallback = allocCallback;
@ -57,28 +58,57 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
private static void UvConnectionCb(IntPtr handle, int status)
{
var stream = FromIntPtr<UvStreamHandle>(handle);
stream._connectionCallback(stream, status, stream._connectionState);
Exception error;
status = stream.Libuv.Check(status, out error);
try
{
stream._connectionCallback(stream, status, error, stream._connectionState);
}
catch (Exception ex)
{
Trace.WriteLine("UvConnectionCb " + ex.ToString());
}
}
private static void UvAllocCb(IntPtr handle, int suggested_size, out Libuv.uv_buf_t buf)
{
var stream = FromIntPtr<UvStreamHandle>(handle);
buf = stream._allocCallback(stream, suggested_size, stream._readState);
try
{
buf = stream._allocCallback(stream, suggested_size, stream._readState);
}
catch (Exception ex)
{
Trace.WriteLine("UvAllocCb " + ex.ToString());
buf = stream.Libuv.buf_init(IntPtr.Zero, 0);
throw;
}
}
private static void UvReadCb(IntPtr handle, int nread, ref Libuv.uv_buf_t buf)
{
var stream = FromIntPtr<UvStreamHandle>(handle);
if (nread == -4095)
try
{
stream._readCallback(stream, 0, stream._readState);
return;
if (nread < 0)
{
Exception error;
stream._uv.Check(nread, out error);
stream._readCallback(stream, 0, error, stream._readState);
}
else
{
stream._readCallback(stream, nread, null, stream._readState);
}
}
catch (Exception ex)
{
Trace.WriteLine("UbReadCb " + ex.ToString());
}
var length = stream._uv.Check(nread);
stream._readCallback(stream, nread, stream._readState);
}
}

View File

@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace Microsoft.AspNet.Server.Kestrel.Networking
@ -16,12 +17,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
IntPtr _bufs;
Action<UvWriteReq, int, object> _callback;
Action<UvWriteReq, int, Exception, object> _callback;
object _state;
const int BUFFER_COUNT = 4;
List<GCHandle> _pins = new List<GCHandle>();
public void Init(UvLoopHandle loop)
{
var requestSize = loop.Libuv.req_size(Libuv.RequestType.WRITE);
@ -30,7 +31,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
_bufs = handle + requestSize;
}
public unsafe void Write(UvStreamHandle handle, ArraySegment<ArraySegment<byte>> bufs, Action<UvWriteReq, int, object> callback, object state)
public unsafe void Write(
UvStreamHandle handle,
ArraySegment<ArraySegment<byte>> bufs,
Action<UvWriteReq, int, Exception, object> callback,
object state)
{
var pBuffers = (Libuv.uv_buf_t*)_bufs;
var nBuffers = bufs.Count;
@ -59,14 +64,32 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
private static void UvWriteCb(IntPtr ptr, int status)
{
var req = FromIntPtr<UvWriteReq>(ptr);
foreach(var pin in req._pins)
foreach (var pin in req._pins)
{
pin.Free();
}
req._pins.Clear();
req._callback(req, status, req._state);
var callback = req._callback;
req._callback = null;
var state = req._state;
req._state = null;
Exception error = null;
if (status < 0)
{
req.Libuv.Check(status, out error);
}
try
{
callback(req, status, error, state);
}
catch (Exception ex)
{
Trace.WriteLine("UvWriteCb " + ex.ToString());
}
}
}

View File

@ -10,7 +10,7 @@ using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for EngineTests
@ -35,20 +35,22 @@ namespace Microsoft.AspNet.Server.KestralTests
{
get
{
try
try
{
var locator = CallContextServiceLocator.Locator;
if (locator == null)
if (locator == null)
{
return null;
}
var services = locator.ServiceProvider;
if (services == null)
if (services == null)
{
return null;
}
return (ILibraryManager)services.GetService(typeof(ILibraryManager));
} catch (NullReferenceException) { return null; }
}
catch (NullReferenceException)
{ return null; }
}
}
@ -314,5 +316,29 @@ namespace Microsoft.AspNet.Server.KestralTests
}
}
[Fact]
public async Task DisconnectingClient()
{
using (var server = new TestServer(App))
{
var socket = new Socket(SocketType.Stream, ProtocolType.IP);
socket.Connect(IPAddress.Loopback, 54321);
await Task.Delay(200);
socket.Disconnect(false);
socket.Dispose();
await Task.Delay(200);
using (var connection = new TestConnection())
{
await connection.SendEnd(
"GET / HTTP/1.0",
"\r\n");
await connection.ReceiveEnd(
"HTTP/1.0 200 OK",
"\r\n");
}
}
}
}
}

View File

@ -4,7 +4,7 @@ using System;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for MessageBodyExchangerTests

View File

@ -6,7 +6,7 @@ using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for MessageBodyTests

View File

@ -12,7 +12,7 @@ using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for NetworkingTests
@ -31,12 +31,12 @@ namespace Microsoft.AspNet.Server.KestralTests
get
{
var locator = CallContextServiceLocator.Locator;
if (locator == null)
if (locator == null)
{
return null;
}
var services = locator.ServiceProvider;
if (services == null)
if (services == null)
{
return null;
}
@ -93,7 +93,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var tcp = new UvTcpHandle();
tcp.Init(loop);
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (stream, status, state) =>
tcp.Listen(10, (stream, status, error, state) =>
{
var tcp2 = new UvTcpHandle();
tcp2.Init(loop);
@ -130,7 +130,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var tcp = new UvTcpHandle();
tcp.Init(loop);
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (_, status, state) =>
tcp.Listen(10, (_, status, error, state) =>
{
Console.WriteLine("Connected");
var tcp2 = new UvTcpHandle();
@ -139,7 +139,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart(
(a, b, c) => _uv.buf_init(data, 500),
(__, nread, state2) =>
(__, nread, error2, state2) =>
{
bytesRead += nread;
if (nread == 0)
@ -186,7 +186,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var tcp = new UvTcpHandle();
tcp.Init(loop);
tcp.Bind(new IPEndPoint(IPAddress.Loopback, 54321));
tcp.Listen(10, (_, status, state) =>
tcp.Listen(10, (_, status, error, state) =>
{
Console.WriteLine("Connected");
var tcp2 = new UvTcpHandle();
@ -195,7 +195,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart(
(a, b, c) => tcp2.Libuv.buf_init(data, 500),
(__, nread, state2) =>
(__, nread, error2, state2) =>
{
bytesRead += nread;
if (nread == 0)
@ -211,9 +211,9 @@ namespace Microsoft.AspNet.Server.KestralTests
req.Write(
tcp2,
new ArraySegment<ArraySegment<byte>>(
new[]{new ArraySegment<byte>(new byte[]{65,66,67,68,69})}
new[] { new ArraySegment<byte>(new byte[] { 65, 66, 67, 68, 69 }) }
),
(___,____,_____)=>{},
(_1, _2, _3, _4) => { },
null);
}
}
@ -243,7 +243,7 @@ namespace Microsoft.AspNet.Server.KestralTests
TaskCreationOptions.None);
socket.Shutdown(SocketShutdown.Send);
var buffer = new ArraySegment<byte>(new byte[2048]);
for(;;)
for (; ;)
{
var count = await Task.Factory.FromAsync(
socket.BeginReceive,
@ -252,7 +252,7 @@ namespace Microsoft.AspNet.Server.KestralTests
SocketFlags.None,
null,
TaskCreationOptions.None);
Console.WriteLine("count {0} {1}",
Console.WriteLine("count {0} {1}",
count,
System.Text.Encoding.ASCII.GetString(buffer.Array, 0, count));
if (count <= 0) break;

View File

@ -1,6 +1,6 @@
using System;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for Program
@ -9,7 +9,7 @@ namespace Microsoft.AspNet.Server.KestralTests
{
public void Main()
{
new EngineTests().Http11().Wait();
new EngineTests().DisconnectingClient().Wait();
}
}
}

View File

@ -7,7 +7,7 @@ using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for TestConnection

View File

@ -1,7 +1,7 @@
using System;
using Microsoft.AspNet.Server.Kestrel.Http;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
class TestInput : IConnectionControl, IFrameControl
{
@ -44,7 +44,7 @@ namespace Microsoft.AspNet.Server.KestralTests
{
}
public void Write(ArraySegment<byte> data, Action<object> callback, object state)
public void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state)
{
}
public void End(ProduceEndType endType)

View File

@ -5,7 +5,7 @@ using Microsoft.Framework.Runtime.Infrastructure;
using System;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.KestralTests
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for TestServer