diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index b49ffd4316..89b9db55e8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = Interlocked.Increment(ref _lastConnectionId); _rawSocketInput = new SocketInput(Memory2, ThreadPool); - _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool); + _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool); } public void Start() diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs index 6721756266..5d8c4e8d0d 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -94,6 +94,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { var socket = (Listener)tcs2.Task.AsyncState; socket.ListenSocket.Dispose(); + + var writeReqPool = socket.WriteReqPool; + while (writeReqPool.Count > 0) + { + writeReqPool.Dequeue().Dispose(); + } + tcs2.SetResult(0); } catch (Exception ex) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs index fecd0afa44..7247afa2bd 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs @@ -1,8 +1,9 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. -using Microsoft.AspNet.Http; +using System.Collections.Generic; using Microsoft.AspNet.Server.Kestrel.Infrastructure; +using Microsoft.AspNet.Server.Kestrel.Networking; namespace Microsoft.AspNet.Server.Kestrel.Http { @@ -17,6 +18,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http : base(serviceContext) { Memory2 = new MemoryPool2(); + WriteReqPool = new Queue(SocketOutput.MaxPooledWriteReqs); } public ListenerContext(ListenerContext listenerContext) @@ -25,6 +27,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http ServerAddress = listenerContext.ServerAddress; Thread = listenerContext.Thread; Memory2 = listenerContext.Memory2; + WriteReqPool = listenerContext.WriteReqPool; Log = listenerContext.Log; } @@ -33,5 +36,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public KestrelThread Thread { get; set; } public MemoryPool2 Memory2 { get; set; } + + public Queue WriteReqPool { get; set; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 71675e5513..39dcf3c2c8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -14,10 +14,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { + public const int MaxPooledWriteReqs = 1024; + private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; - private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); @@ -39,7 +40,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // This locks access to to all of the below fields private readonly object _contextLock = new object(); - private bool _isDisposed = false; // The number of write operations that have been scheduled so far // but have not completed. @@ -50,7 +50,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; - private readonly Queue _writeContextPool; + private readonly Queue _writeReqPool; public SocketOutput( KestrelThread thread, @@ -59,7 +59,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http Connection connection, long connectionId, IKestrelTrace log, - IThreadPool threadPool) + IThreadPool threadPool, + Queue writeReqPool) { _thread = thread; _socket = socket; @@ -69,7 +70,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); - _writeContextPool = new Queue(_maxPooledWriteContexts); + _writeReqPool = writeReqPool; _head = memory.Lease(); _tail = _head; @@ -96,14 +97,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { if (_nextWriteContext == null) { - if (_writeContextPool.Count > 0) - { - _nextWriteContext = _writeContextPool.Dequeue(); - } - else - { - _nextWriteContext = new WriteContext(this); - } + _nextWriteContext = new WriteContext(this); } if (socketShutdownSend) @@ -304,7 +298,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http lock (_contextLock) { - PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -382,32 +375,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } - private void PoolWriteContext(WriteContext writeContext) - { - // called inside _contextLock - if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts) - { - writeContext.Reset(); - _writeContextPool.Enqueue(writeContext); - } - else - { - writeContext.Dispose(); - } - } - - private void Dispose() - { - lock (_contextLock) - { - _isDisposed = true; - while (_writeContextPool.Count > 0) - { - _writeContextPool.Dequeue().Dispose(); - } - } - } - void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -449,7 +416,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http buffers++; } - private class WriteContext : IDisposable + private class WriteContext { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); @@ -473,8 +440,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public WriteContext(SocketOutput self) { Self = self; - _writeReq = new UvWriteReq(Self._log); - _writeReq.Init(Self._thread.Loop); } /// @@ -495,13 +460,24 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var lockedEndBlock = _lockedEnd.Block; var lockedEndIndex = _lockedEnd.Index; + if (Self._writeReqPool.Count > 0) + { + _writeReq = Self._writeReqPool.Dequeue(); + } + else + { + _writeReq = new UvWriteReq(Self._log); + _writeReq.Init(Self._thread.Loop); + } + _writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => { - var _this = (WriteContext)state; - _this.ScheduleReturnFullyWrittenBlocks(); - _this.WriteStatus = status; - _this.WriteError = error; - _this.DoShutdownIfNeeded(); + var writeContext = (WriteContext)state; + writeContext.PoolWriteReq(writeContext._writeReq); + writeContext.ScheduleReturnFullyWrittenBlocks(); + writeContext.WriteStatus = status; + writeContext.WriteError = error; + writeContext.DoShutdownIfNeeded(); }, this); Self._head = lockedEndBlock; @@ -545,14 +521,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } else if (Self._socket.IsClosed) { - Self.Dispose(); Complete(); return; } Self._socket.Dispose(); Self.ReturnAllBlocks(); - Self.Dispose(); Self._log.ConnectionStop(Self._connectionId); Complete(); } @@ -561,7 +535,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { Self.OnWriteCompleted(this); } - + + private void PoolWriteReq(UvWriteReq writeReq) + { + if (Self._writeReqPool.Count < MaxPooledWriteReqs) + { + Self._writeReqPool.Enqueue(writeReq); + } + else + { + writeReq.Dispose(); + } + } + private void ScheduleReturnFullyWrittenBlocks() { var block = _lockedStart.Block; @@ -608,30 +594,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _lockedStart = new MemoryPoolIterator2(head, head.Start); _lockedEnd = new MemoryPoolIterator2(tail, tail.End); - + BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); } - - public void Reset() - { - _lockedStart = default(MemoryPoolIterator2); - _lockedEnd = default(MemoryPoolIterator2); - _bufferCount = 0; - ByteCount = 0; - - SocketShutdownSend = false; - SocketDisconnect = false; - - WriteStatus = 0; - WriteError = null; - - ShutdownSendStatus = 0; - } - - public void Dispose() - { - _writeReq.Dispose(); - } } } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index d5f86b76aa..bda6bfb651 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. var bufferSize = 1048576; @@ -89,7 +89,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -146,7 +146,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; @@ -227,7 +227,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -304,7 +304,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); var ltp = new LoggingThreadPool(trace); - var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue()); // block 1 var start = socketOutput.ProducingStart();