diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs index 2e0872f704..e3d2eb5603 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs @@ -22,7 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter IKestrelTrace logger) { SocketInput = new SocketInput(memory); - SocketOutput = new StreamSocketOutput(filteredStream); + SocketOutput = new StreamSocketOutput(filteredStream, memory); _log = logger; _filteredStream = filteredStream; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/StreamSocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/StreamSocketOutput.cs index fe9499265f..ba7c57dcf4 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/StreamSocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/StreamSocketOutput.cs @@ -13,10 +13,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter public class StreamSocketOutput : ISocketOutput { private readonly Stream _outputStream; + private readonly MemoryPool2 _memory; + private MemoryPoolBlock2 _producingBlock; - public StreamSocketOutput(Stream outputStream) + public StreamSocketOutput(Stream outputStream, MemoryPool2 memory) { _outputStream = outputStream; + _memory = memory; } void ISocketOutput.Write(ArraySegment buffer, bool immediate) @@ -30,5 +33,27 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter _outputStream.Write(buffer.Array, buffer.Offset, buffer.Count); return TaskUtilities.CompletedTask; } + + public MemoryPoolIterator2 ProducingStart() + { + _producingBlock = _memory.Lease(); + return new MemoryPoolIterator2(_producingBlock); + } + + public void ProducingComplete(MemoryPoolIterator2 end, int count) + { + var block = _producingBlock; + while (block != end.Block) + { + _outputStream.Write(block.Data.Array, block.Data.Offset, block.Data.Count); + + var returnBlock = block; + block = block.Next; + returnBlock.Pool?.Return(returnBlock); + } + + _outputStream.Write(end.Block.Array, end.Block.Data.Offset, end.Index); + end.Block.Pool?.Return(end.Block); + } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index 4af0676761..03c2a619b9 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -42,7 +42,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = Interlocked.Increment(ref _lastConnectionId); _rawSocketInput = new SocketInput(Memory2); - _rawSocketOutput = new SocketOutput(Thread, _socket, this, _connectionId, Log); + _rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log); } public void Start() @@ -116,7 +116,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http // called from a libuv thread. ThreadPool.QueueUserWorkItem(state => { - var connection = (Connection)this; + var connection = (Connection)state; connection._frame.Abort(); }, this); } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ISocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ISocketOutput.cs index 4edffa3055..1ec17d269b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ISocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ISocketOutput.cs @@ -4,6 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNet.Server.Kestrel.Infrastructure; namespace Microsoft.AspNet.Server.Kestrel.Http { @@ -14,5 +15,22 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { void Write(ArraySegment buffer, bool immediate = true); Task WriteAsync(ArraySegment buffer, bool immediate = true, CancellationToken cancellationToken = default(CancellationToken)); + + /// + /// Returns an iterator pointing to the tail of the response buffer. Response data can be appended + /// manually or by using . + /// Be careful to ensure all appended blocks are backed by a . + /// + MemoryPoolIterator2 ProducingStart(); + + /// + /// Commits the response data appended to the iterator returned from . + /// All the data up to will be included in the response. + /// A write operation isn't guaranteed to be scheduled unless + /// or is called afterwards. + /// + /// Points to the end of the committed data. + /// The number of bytes added to the response. + void ProducingComplete(MemoryPoolIterator2 end, int count); } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs index ae74bffb24..b13d678130 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -64,7 +64,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { _pinned = _tail; var data = new ArraySegment(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End); - var dataPtr = _pinned.Pin(); + var dataPtr = _pinned.Pin() + _pinned.End; return new IncomingBuffer { Data = data, @@ -77,7 +77,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http return new IncomingBuffer { Data = _pinned.Data, - DataPtr = _pinned.Pin() + DataPtr = _pinned.Pin() + _pinned.End }; } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index d1e353388f..be47d8e720 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -23,8 +23,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private readonly long _connectionId; private readonly IKestrelTrace _log; + // This locks all access to _tail, _isProducing and _returnFromOnProducingComplete. + // _head does not require a lock, since it is only used in the ctor and uv thread. + private readonly object _returnLock = new object(); + + private MemoryPoolBlock2 _head; + private MemoryPoolBlock2 _tail; + + private bool _isProducing; + private MemoryPoolBlock2 _returnFromOnProducingComplete; + // This locks access to to all of the below fields - private readonly object _lockObj = new object(); + private readonly object _contextLock = new object(); // The number of write operations that have been scheduled so far // but have not completed. @@ -38,6 +48,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public SocketOutput( KestrelThread thread, UvStreamHandle socket, + MemoryPool2 memory, Connection connection, long connectionId, IKestrelTrace log) @@ -48,6 +59,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connectionId = connectionId; _log = log; _tasksPending = new Queue>(); + + _head = memory.Lease(); + _tail = _head; } public Task WriteAsync( @@ -56,28 +70,20 @@ namespace Microsoft.AspNet.Server.Kestrel.Http bool socketShutdownSend = false, bool socketDisconnect = false) { - //TODO: need buffering that works - if (buffer.Array != null) - { - var copy = new byte[buffer.Count]; - Array.Copy(buffer.Array, buffer.Offset, copy, 0, buffer.Count); - buffer = new ArraySegment(copy); - _log.ConnectionWrite(_connectionId, buffer.Count); - } + var tail = ProducingStart(); + tail = tail.CopyFrom(buffer); + // We do our own accounting below + ProducingComplete(tail, count: 0); TaskCompletionSource tcs = null; - lock (_lockObj) + lock (_contextLock) { if (_nextWriteContext == null) { _nextWriteContext = new WriteContext(this); } - if (buffer.Array != null) - { - _nextWriteContext.Buffers.Enqueue(buffer); - } if (socketShutdownSend) { _nextWriteContext.SocketShutdownSend = true; @@ -138,6 +144,58 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + public MemoryPoolIterator2 ProducingStart() + { + lock (_returnLock) + { + Debug.Assert(!_isProducing); + _isProducing = true; + + if (_tail == null) + { + throw new IOException("The socket has been closed."); + } + + return new MemoryPoolIterator2(_tail, _tail.End); + } + } + + public void ProducingComplete(MemoryPoolIterator2 end, int count) + { + lock (_returnLock) + { + Debug.Assert(_isProducing); + _isProducing = false; + + if (_returnFromOnProducingComplete == null) + { + _tail = end.Block; + _tail.End = end.Index; + + if (count != 0) + { + lock (_contextLock) + { + _numBytesPreCompleted += count; + } + } + } + else + { + var block = _returnFromOnProducingComplete; + while (block != null) + { + var returnBlock = block; + block = block.Next; + + returnBlock.Pool?.Return(returnBlock); + } + + _returnFromOnProducingComplete = null; + } + } + } + private void ScheduleWrite() { _thread.Post(_this => _this.WriteAllPending(), this); @@ -148,7 +206,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { WriteContext writingContext; - lock (_lockObj) + lock (_contextLock) { if (_nextWriteContext != null) { @@ -168,7 +226,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } catch { - lock (_lockObj) + lock (_contextLock) { // Lock instead of using Interlocked.Decrement so _writesSending // doesn't change in the middle of executing other synchronized code. @@ -180,7 +238,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } // This is called on the libuv event loop - private void OnWriteCompleted(Queue> writtenBuffers, int status, Exception error) + private void OnWriteCompleted(int bytesWritten, int status, Exception error) { _log.ConnectionWriteCallback(_connectionId, status); @@ -192,7 +250,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _connection.Abort(); } - lock (_lockObj) + lock (_contextLock) { if (_nextWriteContext != null) { @@ -203,13 +261,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _writesPending--; } - foreach (var writeBuffer in writtenBuffers) - { - // _numBytesPreCompleted can temporarily go negative in the event there are - // completed writes that we haven't triggered callbacks for yet. - _numBytesPreCompleted -= writeBuffer.Count; - } - + // _numBytesPreCompleted can temporarily go negative in the event there are + // completed writes that we haven't triggered callbacks for yet. + _numBytesPreCompleted -= bytesWritten; + // bytesLeftToBuffer can be greater than _maxBytesPreCompleted // This allows large writes to complete once they've actually finished. var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; @@ -225,20 +280,48 @@ namespace Microsoft.AspNet.Server.Kestrel.Http if (_lastWriteError == null) { ThreadPool.QueueUserWorkItem( - (o) => ((TaskCompletionSource)o).SetResult(null), + (o) => ((TaskCompletionSource)o).SetResult(null), tcs); } else { // error is closure captured ThreadPool.QueueUserWorkItem( - (o) => ((TaskCompletionSource)o).SetException(_lastWriteError), + (o) => ((TaskCompletionSource)o).SetException(_lastWriteError), tcs); } } + } + } - // Now that the while loop has completed the following invariants should hold true: - Debug.Assert(_numBytesPreCompleted >= 0); + // This is called on the libuv event loop + private void ReturnAllBlocks() + { + lock (_returnLock) + { + var block = _head; + while (block != _tail) + { + var returnBlock = block; + block = block.Next; + + returnBlock.Unpin(); + returnBlock.Pool?.Return(returnBlock); + } + + _tail.Unpin(); + + if (_isProducing) + { + _returnFromOnProducingComplete = _tail; + } + else + { + _tail.Pool?.Return(_tail); + } + + _head = null; + _tail = null; } } @@ -263,9 +346,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private class WriteContext { + private MemoryPoolIterator2 _lockedStart; + private MemoryPoolIterator2 _lockedEnd; + private int _bufferCount; + private int _byteCount; + public SocketOutput Self; - public Queue> Buffers; public bool SocketShutdownSend; public bool SocketDisconnect; @@ -277,7 +364,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public WriteContext(SocketOutput self) { Self = self; - Buffers = new Queue>(); } /// @@ -285,30 +371,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http /// public void DoWriteIfNeeded() { - if (Buffers.Count == 0 || Self._socket.IsClosed) + LockWrite(); + + if (_byteCount == 0 || Self._socket.IsClosed) { DoShutdownIfNeeded(); return; } - var buffers = new ArraySegment[Buffers.Count]; - - var i = 0; - foreach (var buffer in Buffers) - { - buffers[i++] = buffer; - } - var writeReq = new UvWriteReq(Self._log); writeReq.Init(Self._thread.Loop); - writeReq.Write(Self._socket, new ArraySegment>(buffers), (_writeReq, status, error, state) => + writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => { _writeReq.Dispose(); var _this = (WriteContext)state; + _this.ReturnFullyWrittenBlocks(); _this.WriteStatus = status; _this.WriteError = error; _this.DoShutdownIfNeeded(); }, this); + + Self._head = _lockedEnd.Block; + Self._head.Start = _lockedEnd.Index; } /// @@ -348,13 +432,62 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } Self._socket.Dispose(); + Self.ReturnAllBlocks(); Self._log.ConnectionStop(Self._connectionId); Complete(); } public void Complete() { - Self.OnWriteCompleted(Buffers, WriteStatus, WriteError); + Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError); + } + + private void ReturnFullyWrittenBlocks() + { + var block = _lockedStart.Block; + while (block != _lockedEnd.Block) + { + var returnBlock = block; + block = block.Next; + + returnBlock.Unpin(); + returnBlock.Pool?.Return(returnBlock); + } + } + + private void LockWrite() + { + var head = Self._head; + var tail = Self._tail; + + if (head == null || tail == null) + { + // ReturnAllBlocks has already bee called. Nothing to do here. + // Write will no-op since _byteCount will remain 0. + return; + } + + _lockedStart = new MemoryPoolIterator2(head, head.Start); + _lockedEnd = new MemoryPoolIterator2(tail, tail.End); + + if (_lockedStart.Block == _lockedEnd.Block) + { + _byteCount = _lockedEnd.Index - _lockedStart.Index; + _bufferCount = 1; + return; + } + + _byteCount = _lockedStart.Block.Data.Offset + _lockedStart.Block.Data.Count - _lockedStart.Index; + _bufferCount = 1; + + for (var block = _lockedStart.Block.Next; block != _lockedEnd.Block; block = block.Next) + { + _byteCount += block.Data.Count; + _bufferCount++; + } + + _byteCount += _lockedEnd.Index - _lockedEnd.Block.Data.Offset; + _bufferCount++; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs index 37d64ddcaf..446f03c2b3 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs @@ -59,7 +59,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure /// The block returned must be at least this size. It may be larger than this minimum size, and if so, /// the caller may write to the block's entire size rather than being limited to the minumumSize requested. /// The block that is reserved for the called. It must be passed to Return when it is no longer being used. - public MemoryPoolBlock2 Lease(int minimumSize) + public MemoryPoolBlock2 Lease(int minimumSize = _blockLength) { if (minimumSize > _blockLength) { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs index a93e03186d..2bcd21d9a2 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs @@ -98,24 +98,27 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure } /// - /// Called to ensure that a block is pinned, and return the pointer to native memory just after - /// the range of "active" bytes. This is where arriving data is read into. + /// Called to ensure that a block is pinned, and return the pointer to the native address + /// of the first byte of this block's Data memory. Arriving data is read into Pin() + End. + /// Outgoing data is read from Pin() + Start. /// /// public IntPtr Pin() { - Debug.Assert(!_pinHandle.IsAllocated); - if (_dataArrayPtr != IntPtr.Zero) { // this is a slab managed block - use the native address of the slab which is always locked - return _dataArrayPtr + End; + return _dataArrayPtr; + } + else if (_pinHandle.IsAllocated) + { + return _pinHandle.AddrOfPinnedObject(); } else { // this is one-time-use memory - lock the managed memory until Unpin is called _pinHandle = GCHandle.Alloc(Data.Array, GCHandleType.Pinned); - return _pinHandle.AddrOfPinnedObject() + End; + return _pinHandle.AddrOfPinnedObject(); } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs index 7bf514707e..6c5c1f205a 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.Linq; using System.Numerics; @@ -530,5 +531,46 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure } } } + + public MemoryPoolIterator2 CopyFrom(ArraySegment buffer) + { + Debug.Assert(_block != null); + Debug.Assert(_block.Pool != null); + Debug.Assert(_block.Next == null); + Debug.Assert(_block.End == _index); + + var pool = _block.Pool; + var block = _block; + var blockIndex = _index; + + var bufferIndex = buffer.Offset; + var remaining = buffer.Count; + + while (remaining > 0) + { + var bytesLeftInBlock = block.Data.Offset + block.Data.Count - blockIndex; + + if (bytesLeftInBlock == 0) + { + var nextBlock = pool.Lease(); + block.Next = nextBlock; + block = nextBlock; + + blockIndex = block.Data.Offset; + bytesLeftInBlock = block.Data.Count; + } + + var bytesToCopy = Math.Min(remaining, bytesLeftInBlock); + + Buffer.BlockCopy(buffer.Array, bufferIndex, block.Array, blockIndex, bytesToCopy); + + blockIndex += bytesToCopy; + bufferIndex += bytesToCopy; + remaining -= bytesToCopy; + block.End = blockIndex; + } + + return new MemoryPoolIterator2(block, blockIndex); + } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 103ddfc6ee..7330ef3c15 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -41,7 +41,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking public unsafe void Write( UvStreamHandle handle, - ArraySegment> bufs, + MemoryPoolIterator2 start, + MemoryPoolIterator2 end, + int nBuffers, Action callback, object state) { @@ -51,7 +53,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking _pins.Add(GCHandle.Alloc(this, GCHandleType.Normal)); var pBuffers = (Libuv.uv_buf_t*)_bufs; - var nBuffers = bufs.Count; if (nBuffers > BUFFER_COUNT) { // create and pin buffer array when it's larger than the pre-allocated one @@ -61,16 +62,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject(); } + var block = start.Block; for (var index = 0; index < nBuffers; index++) { - // create and pin each segment being written - var buf = bufs.Array[bufs.Offset + index]; + var blockStart = block == start.Block ? start.Index : block.Data.Offset; + var blockEnd = block == end.Block ? end.Index : block.Data.Offset + block.Data.Count; - var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned); - _pins.Add(gcHandle); + // create and pin each segment being written pBuffers[index] = Libuv.buf_init( - gcHandle.AddrOfPinnedObject() + buf.Offset, - buf.Count); + block.Pin() + blockStart, + blockEnd - blockStart); + + block = block.Next; } _callback = callback; diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs index db3b03802f..5f445ef487 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System; +using System.Linq; using Microsoft.AspNet.Server.Kestrel.Infrastructure; using Xunit; @@ -152,6 +153,38 @@ namespace Microsoft.AspNet.Server.KestrelTests } } + [Fact] + public void CopyFromCorrectlyTraversesBlocks() + { + using (var pool = new MemoryPool2()) + { + var block1 = pool.Lease(128); + var iterator = block1.GetIterator(); + var bufferSize = block1.Data.Count * 3; + var buffer = new byte[bufferSize]; + + for (int i = 0; i < bufferSize; i++) + { + buffer[i] = (byte)(i % 73); + } + + Assert.Null(block1.Next); + + var end = iterator.CopyFrom(new ArraySegment(buffer)); + + Assert.NotNull(block1.Next); + + for (int i = 0; i < bufferSize; i++) + { + Assert.Equal(i % 73, iterator.Take()); + } + + Assert.Equal(-1, iterator.Take()); + Assert.Equal(iterator.Block, end.Block); + Assert.Equal(iterator.Index, end.Index); + } + } + [Fact] public void IsEndCorrectlyTraversesBlocks() { diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs index f57c899f6a..44a5757f80 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs @@ -62,14 +62,24 @@ namespace Microsoft.AspNet.Server.KestrelTests var writeRequest = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); writeRequest.Init(loop); + var block = MemoryPoolBlock2.Create( + new ArraySegment(new byte[] { 1, 2, 3, 4 }), + dataPtr: IntPtr.Zero, + pool: null, + slab: null); + var start = new MemoryPoolIterator2(block, 0); + var end = new MemoryPoolIterator2(block, block.Data.Count); writeRequest.Write( serverConnectionPipe, - new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), + start, + end, + 1, (_3, status2, error2, _4) => { writeRequest.Dispose(); serverConnectionPipe.Dispose(); serverListenPipe.Dispose(); + block.Unpin(); }, null); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs index fa8a3ada30..edaa50870e 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs @@ -201,12 +201,22 @@ namespace Microsoft.AspNet.Server.KestrelTests { var req = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); req.Init(loop); + var block = MemoryPoolBlock2.Create( + new ArraySegment(new byte[] { 65, 66, 67, 68, 69 }), + dataPtr: IntPtr.Zero, + pool: null, + slab: null); + var start = new MemoryPoolIterator2(block, 0); + var end = new MemoryPoolIterator2(block, block.Data.Count); req.Write( tcp2, - new ArraySegment>( - new[] { new ArraySegment(new byte[] { 65, 66, 67, 68, 69 }) } - ), - (_1, _2, _3, _4) => { }, + start, + end, + 1, + (_1, _2, _3, _4) => + { + block.Unpin(); + }, null); } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index 69224073dc..629c35407c 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -34,13 +34,14 @@ namespace Microsoft.AspNet.Server.KestrelTests }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + using (var memory = new MemoryPool2()) { kestrelEngine.Start(count: 1); var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, null, 0, trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. var bufferSize = 1048576; @@ -79,13 +80,14 @@ namespace Microsoft.AspNet.Server.KestrelTests }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + using (var memory = new MemoryPool2()) { kestrelEngine.Start(count: 1); var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, null, 0, trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -134,13 +136,14 @@ namespace Microsoft.AspNet.Server.KestrelTests }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + using (var memory = new MemoryPool2()) { kestrelEngine.Start(count: 1); var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, null, 0, trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); var bufferSize = maxBytesPreCompleted; @@ -213,13 +216,14 @@ namespace Microsoft.AspNet.Server.KestrelTests }; using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + using (var memory = new MemoryPool2()) { kestrelEngine.Start(count: 1); var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, null, 0, trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); var bufferSize = maxBytesPreCompleted; var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); @@ -270,6 +274,57 @@ namespace Microsoft.AspNet.Server.KestrelTests } } + [Fact] + public void ProducingStartAndProducingCompleteCanBeUsedDirectly() + { + int nBuffers = 0; + var nBufferWh = new ManualResetEventSlim(); + + var mockLibuv = new MockLibuv + { + OnWrite = (socket, buffers, triggerCompleted) => + { + nBuffers = buffers; + nBufferWh.Set(); + triggerCompleted(0); + return 0; + } + }; + + using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + using (var memory = new MemoryPool2()) + { + kestrelEngine.Start(count: 1); + + var kestrelThread = kestrelEngine.Threads[0]; + var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); + var trace = new KestrelTrace(new TestKestrelTrace()); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace); + + // block 1 + var start = socketOutput.ProducingStart(); + start.Block.End = start.Block.Data.Offset + start.Block.Data.Count; + var totalBytes = start.Block.Data.Count; + + // block 2 + var block2 = memory.Lease(); + block2.End = block2.Data.Offset + block2.Data.Count; + start.Block.Next = block2; + totalBytes += block2.Data.Count; + + var end = new MemoryPoolIterator2(block2, block2.End); + + socketOutput.ProducingComplete(end, totalBytes); + + // A call to Write is required to ensure a write is scheduled + socketOutput.WriteAsync(default(ArraySegment)); + + Assert.True(nBufferWh.Wait(1000)); + Assert.Equal(2, nBuffers); + } + } + + private class MockSocket : UvStreamHandle { public MockSocket(int threadId, IKestrelTrace logger) : base(logger) diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibuv.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibuv.cs index 1134ae8fab..9a05659186 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibuv.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibuv.cs @@ -12,7 +12,7 @@ namespace Microsoft.AspNet.Server.KestrelTests.TestHelpers private bool _stopLoop; private readonly ManualResetEventSlim _loopWh = new ManualResetEventSlim(); - private Func>, Action, int> _onWrite; + private Func, int> _onWrite; unsafe public MockLibuv() { @@ -68,7 +68,7 @@ namespace Microsoft.AspNet.Server.KestrelTests.TestHelpers _uv_walk = (loop, callback, ignore) => 0; } - public Func>, Action, int> OnWrite + public Func, int> OnWrite { get { @@ -82,7 +82,7 @@ namespace Microsoft.AspNet.Server.KestrelTests.TestHelpers unsafe private int UvWrite(UvRequest req, UvStreamHandle handle, uv_buf_t* bufs, int nbufs, uv_write_cb cb) { - return _onWrite(handle, new ArraySegment>(), status => cb(req.InternalGetHandle(), status)); + return _onWrite(handle, nbufs, status => cb(req.InternalGetHandle(), status)); } } }