Pool UvWriteReqs instead of SocketOutput.WriteContexts

- This allows all connections accepted by the same thread to share a pool
This commit is contained in:
Stephen Halter 2015-12-09 17:12:45 -08:00 committed by Ben Adams
parent 992664e0dc
commit 3e42904096
5 changed files with 57 additions and 80 deletions

View File

@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_connectionId = Interlocked.Increment(ref _lastConnectionId);
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
}
public void Start()

View File

@ -94,6 +94,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
var socket = (Listener)tcs2.Task.AsyncState;
socket.ListenSocket.Dispose();
var writeReqPool = socket.WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}
tcs2.SetResult(0);
}
catch (Exception ex)

View File

@ -1,8 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNet.Http;
using System.Collections.Generic;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.AspNet.Server.Kestrel.Networking;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
@ -17,6 +18,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
: base(serviceContext)
{
Memory2 = new MemoryPool2();
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
}
public ListenerContext(ListenerContext listenerContext)
@ -25,6 +27,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Memory2 = listenerContext.Memory2;
WriteReqPool = listenerContext.WriteReqPool;
Log = listenerContext.Log;
}
@ -33,5 +36,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public KestrelThread Thread { get; set; }
public MemoryPool2 Memory2 { get; set; }
public Queue<UvWriteReq> WriteReqPool { get; set; }
}
}

View File

@ -14,10 +14,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class SocketOutput : ISocketOutput
{
public const int MaxPooledWriteReqs = 1024;
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;
private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
@ -39,7 +40,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// This locks access to to all of the below fields
private readonly object _contextLock = new object();
private bool _isDisposed = false;
// The number of write operations that have been scheduled so far
// but have not completed.
@ -50,7 +50,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
private readonly Queue<WriteContext> _writeContextPool;
private readonly Queue<UvWriteReq> _writeReqPool;
public SocketOutput(
KestrelThread thread,
@ -59,7 +59,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Connection connection,
long connectionId,
IKestrelTrace log,
IThreadPool threadPool)
IThreadPool threadPool,
Queue<UvWriteReq> writeReqPool)
{
_thread = thread;
_socket = socket;
@ -69,7 +70,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = writeReqPool;
_head = memory.Lease();
_tail = _head;
@ -96,14 +97,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
if (_nextWriteContext == null)
{
if (_writeContextPool.Count > 0)
{
_nextWriteContext = _writeContextPool.Dequeue();
}
else
{
_nextWriteContext = new WriteContext(this);
}
_nextWriteContext = new WriteContext(this);
}
if (socketShutdownSend)
@ -304,7 +298,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
lock (_contextLock)
{
PoolWriteContext(writeContext);
if (_nextWriteContext != null)
{
scheduleWrite = true;
@ -382,32 +375,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
private void PoolWriteContext(WriteContext writeContext)
{
// called inside _contextLock
if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts)
{
writeContext.Reset();
_writeContextPool.Enqueue(writeContext);
}
else
{
writeContext.Dispose();
}
}
private void Dispose()
{
lock (_contextLock)
{
_isDisposed = true;
while (_writeContextPool.Count > 0)
{
_writeContextPool.Dequeue().Dispose();
}
}
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
var task = WriteAsync(buffer, immediate);
@ -449,7 +416,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
buffers++;
}
private class WriteContext : IDisposable
private class WriteContext
{
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
@ -473,8 +440,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public WriteContext(SocketOutput self)
{
Self = self;
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}
/// <summary>
@ -495,13 +460,24 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;
if (Self._writeReqPool.Count > 0)
{
_writeReq = Self._writeReqPool.Dequeue();
}
else
{
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
var _this = (WriteContext)state;
_this.ScheduleReturnFullyWrittenBlocks();
_this.WriteStatus = status;
_this.WriteError = error;
_this.DoShutdownIfNeeded();
var writeContext = (WriteContext)state;
writeContext.PoolWriteReq(writeContext._writeReq);
writeContext.ScheduleReturnFullyWrittenBlocks();
writeContext.WriteStatus = status;
writeContext.WriteError = error;
writeContext.DoShutdownIfNeeded();
}, this);
Self._head = lockedEndBlock;
@ -545,14 +521,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
else if (Self._socket.IsClosed)
{
Self.Dispose();
Complete();
return;
}
Self._socket.Dispose();
Self.ReturnAllBlocks();
Self.Dispose();
Self._log.ConnectionStop(Self._connectionId);
Complete();
}
@ -561,7 +535,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
Self.OnWriteCompleted(this);
}
private void PoolWriteReq(UvWriteReq writeReq)
{
if (Self._writeReqPool.Count < MaxPooledWriteReqs)
{
Self._writeReqPool.Enqueue(writeReq);
}
else
{
writeReq.Dispose();
}
}
private void ScheduleReturnFullyWrittenBlocks()
{
var block = _lockedStart.Block;
@ -608,30 +594,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_lockedStart = new MemoryPoolIterator2(head, head.Start);
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
}
public void Reset()
{
_lockedStart = default(MemoryPoolIterator2);
_lockedEnd = default(MemoryPoolIterator2);
_bufferCount = 0;
ByteCount = 0;
SocketShutdownSend = false;
SocketDisconnect = false;
WriteStatus = 0;
WriteError = null;
ShutdownSendStatus = 0;
}
public void Dispose()
{
_writeReq.Dispose();
}
}
}
}

View File

@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
var bufferSize = 1048576;
@ -89,7 +89,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -146,7 +146,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
@ -227,7 +227,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
var bufferSize = maxBytesPreCompleted;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@ -304,7 +304,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
// block 1
var start = socketOutput.ProducingStart();