Merge branch 'benaadams/pool-writecontexts' into dev

This commit is contained in:
Stephen Halter 2015-12-13 02:57:27 -08:00
commit a696eb89d5
7 changed files with 145 additions and 44 deletions

View File

@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_connectionId = Interlocked.Increment(ref _lastConnectionId);
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
}
public void Start()

View File

@ -94,6 +94,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
var socket = (Listener)tcs2.Task.AsyncState;
socket.ListenSocket.Dispose();
var writeReqPool = socket.WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}
tcs2.SetResult(0);
}
catch (Exception ex)

View File

@ -1,8 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNet.Http;
using System.Collections.Generic;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.AspNet.Server.Kestrel.Networking;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
@ -17,6 +18,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
: base(serviceContext)
{
Memory2 = new MemoryPool2();
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
}
public ListenerContext(ListenerContext listenerContext)
@ -25,6 +27,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Memory2 = listenerContext.Memory2;
WriteReqPool = listenerContext.WriteReqPool;
Log = listenerContext.Log;
}
@ -33,5 +36,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public KestrelThread Thread { get; set; }
public MemoryPool2 Memory2 { get; set; }
public Queue<UvWriteReq> WriteReqPool { get; set; }
}
}

View File

@ -14,9 +14,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class SocketOutput : ISocketOutput
{
public const int MaxPooledWriteReqs = 1024;
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;
private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
@ -42,12 +45,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// The number of write operations that have been scheduled so far
// but have not completed.
private int _writesPending = 0;
private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
private readonly Queue<WriteContext> _writeContextPool;
private readonly Queue<UvWriteReq> _writeReqPool;
public SocketOutput(
KestrelThread thread,
@ -56,7 +60,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Connection connection,
long connectionId,
IKestrelTrace log,
IThreadPool threadPool)
IThreadPool threadPool,
Queue<UvWriteReq> writeReqPool)
{
_thread = thread;
_socket = socket;
@ -66,6 +71,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = writeReqPool;
_head = memory.Lease();
_tail = _head;
@ -92,7 +99,14 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
if (_writeContextPool.Count > 0)
{
_nextWriteContext = _writeContextPool.Dequeue();
}
else
{
_nextWriteContext = new WriteContext(this);
}
}
if (socketShutdownSend)
@ -274,9 +288,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
// This is called on the libuv event loop
private void OnWriteCompleted(int bytesWritten, int status, Exception error)
private void OnWriteCompleted(WriteContext writeContext)
{
_log.ConnectionWriteCallback(_connectionId, status);
var bytesWritten = writeContext.ByteCount;
var status = writeContext.WriteStatus;
var error = writeContext.WriteError;
if (error != null)
{
@ -290,6 +307,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
lock (_contextLock)
{
PoolWriteContext(writeContext);
if (_nextWriteContext != null)
{
scheduleWrite = true;
@ -332,11 +350,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
_log.ConnectionWriteCallback(_connectionId, status);
if (scheduleWrite)
{
// ScheduleWrite();
// on right thread, fairness issues?
WriteAllPending();
ScheduleWrite();
}
_tasksCompleted.Clear();
@ -367,6 +385,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
private void PoolWriteContext(WriteContext writeContext)
{
// called inside _contextLock
if (_writeContextPool.Count < _maxPooledWriteContexts)
{
writeContext.Reset();
_writeContextPool.Enqueue(writeContext);
}
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
var task = WriteAsync(buffer, immediate);
@ -412,19 +440,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
private SocketOutput Self;
private UvWriteReq _writeReq;
private MemoryPoolIterator2 _lockedStart;
private MemoryPoolIterator2 _lockedEnd;
private int _bufferCount;
private int _byteCount;
public SocketOutput Self;
public int ByteCount;
public bool SocketShutdownSend;
public bool SocketDisconnect;
public int WriteStatus;
public Exception WriteError;
public int ShutdownSendStatus;
public WriteContext(SocketOutput self)
@ -439,27 +466,40 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
LockWrite();
if (_byteCount == 0 || Self._socket.IsClosed)
if (ByteCount == 0 || Self._socket.IsClosed)
{
DoShutdownIfNeeded();
return;
}
var writeReq = new UvWriteReq(Self._log);
writeReq.Init(Self._thread.Loop);
// Sample values locally in case write completes inline
// to allow block to be Reset and still complete this function
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;
writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
if (Self._writeReqPool.Count > 0)
{
_writeReq.Dispose();
var _this = (WriteContext)state;
_this.ScheduleReturnFullyWrittenBlocks();
_this.WriteStatus = status;
_this.WriteError = error;
_this.DoShutdownIfNeeded();
_writeReq = Self._writeReqPool.Dequeue();
}
else
{
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
var writeContext = (WriteContext)state;
writeContext.PoolWriteReq(writeContext._writeReq);
writeContext._writeReq = null;
writeContext.ScheduleReturnFullyWrittenBlocks();
writeContext.WriteStatus = status;
writeContext.WriteError = error;
writeContext.DoShutdownIfNeeded();
}, this);
Self._head = _lockedEnd.Block;
Self._head.Start = _lockedEnd.Index;
Self._head = lockedEndBlock;
Self._head.Start = lockedEndIndex;
}
/// <summary>
@ -506,9 +546,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Complete()
{
Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError);
Self.OnWriteCompleted(this);
}
private void PoolWriteReq(UvWriteReq writeReq)
{
if (Self._writeReqPool.Count < MaxPooledWriteReqs)
{
Self._writeReqPool.Enqueue(writeReq);
}
else
{
writeReq.Dispose();
}
}
private void ScheduleReturnFullyWrittenBlocks()
{
var block = _lockedStart.Block;
@ -556,7 +608,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_lockedStart = new MemoryPoolIterator2(head, head.Start);
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);
BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount);
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
}
public void Reset()
{
_lockedStart = default(MemoryPoolIterator2);
_lockedEnd = default(MemoryPoolIterator2);
_bufferCount = 0;
ByteCount = 0;
SocketShutdownSend = false;
SocketDisconnect = false;
WriteStatus = 0;
WriteError = null;
ShutdownSendStatus = 0;
}
}
}

View File

@ -18,16 +18,21 @@ namespace Microsoft.AspNet.Server.Kestrel
/// </summary>
public class KestrelThread
{
// maximum times the work queues swapped and are processed in a single pass
// as completing a task may immediately have write data to put on the network
// otherwise it needs to wait till the next pass of the libuv loop
private const int _maxLoops = 8;
private static Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state);
private KestrelEngine _engine;
private readonly IApplicationLifetime _appLifetime;
private Thread _thread;
private UvLoopHandle _loop;
private UvAsyncHandle _post;
private Queue<Work> _workAdding = new Queue<Work>();
private Queue<Work> _workRunning = new Queue<Work>();
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>();
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>();
private Queue<Work> _workAdding = new Queue<Work>(1024);
private Queue<Work> _workRunning = new Queue<Work>(1024);
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>(256);
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>(256);
private object _workSync = new Object();
private bool _stopImmediate = false;
private bool _initCompleted = false;
@ -249,11 +254,17 @@ namespace Microsoft.AspNet.Server.Kestrel
private void OnPost()
{
DoPostWork();
DoPostCloseHandle();
var loopsRemaining = _maxLoops;
bool wasWork;
do
{
wasWork = DoPostWork();
wasWork = DoPostCloseHandle() || wasWork;
loopsRemaining--;
} while (wasWork && loopsRemaining > 0);
}
private void DoPostWork()
private bool DoPostWork()
{
Queue<Work> queue;
lock (_workSync)
@ -262,6 +273,9 @@ namespace Microsoft.AspNet.Server.Kestrel
_workAdding = _workRunning;
_workRunning = queue;
}
bool wasWork = queue.Count > 0;
while (queue.Count != 0)
{
var work = queue.Dequeue();
@ -286,8 +300,10 @@ namespace Microsoft.AspNet.Server.Kestrel
}
}
}
return wasWork;
}
private void DoPostCloseHandle()
private bool DoPostCloseHandle()
{
Queue<CloseHandle> queue;
lock (_workSync)
@ -296,6 +312,9 @@ namespace Microsoft.AspNet.Server.Kestrel
_closeHandleAdding = _closeHandleRunning;
_closeHandleRunning = queue;
}
bool wasWork = queue.Count > 0;
while (queue.Count != 0)
{
var closeHandle = queue.Dequeue();
@ -309,6 +328,8 @@ namespace Microsoft.AspNet.Server.Kestrel
throw;
}
}
return wasWork;
}
private struct Work

View File

@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
/// </summary>
public class UvWriteReq : UvRequest
{
private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb;
private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status);
private IntPtr _bufs;
@ -22,7 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
private object _state;
private const int BUFFER_COUNT = 4;
private List<GCHandle> _pins = new List<GCHandle>();
private List<GCHandle> _pins = new List<GCHandle>(BUFFER_COUNT + 1);
public UvWriteReq(IKestrelTrace logger) : base(logger)
{

View File

@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
var bufferSize = 1048576;
@ -89,7 +89,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -146,7 +146,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
@ -227,7 +227,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -304,7 +304,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
// block 1
var start = socketOutput.ProducingStart();