From afe944c053c6c127623f5996c19a99e9fdd1f4ad Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Wed, 23 Dec 2015 07:41:34 +0000 Subject: [PATCH] Simplify SocketInput, remove locks, only use pooled blocks --- .../Filter/SocketInputStream.cs | 8 +- .../Http/Connection.cs | 6 +- .../Http/SocketInput.cs | 226 ++++++++---------- .../FrameTests.cs | 4 +- .../TestInput.cs | 7 +- 5 files changed, 109 insertions(+), 142 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/SocketInputStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/SocketInputStream.cs index 26b513faa7..d6cc5748f8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/SocketInputStream.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/SocketInputStream.cs @@ -74,11 +74,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter public override void Write(byte[] buffer, int offset, int count) { - var inputBuffer = _socketInput.IncomingStart(count); - - Buffer.BlockCopy(buffer, offset, inputBuffer.Data.Array, inputBuffer.Data.Offset, count); - - _socketInput.IncomingComplete(count, error: null); + _socketInput.IncomingData(buffer, offset, count); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) @@ -90,7 +86,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter protected override void Dispose(bool disposing) { // Close _socketInput with a fake zero-length write that will result in a zero-length read. - _socketInput.IncomingComplete(0, error: null); + _socketInput.IncomingData(null, 0, 0); base.Dispose(disposing); } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index 89b9db55e8..6f2ea40f27 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -140,11 +140,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize) { - var result = _rawSocketInput.IncomingStart(2048); + var result = _rawSocketInput.IncomingStart(); return handle.Libuv.buf_init( - result.DataPtr, - result.Data.Count); + result.Pin() + result.End, + result.Data.Offset + result.Data.Count - result.End); } private static void ReadCallback(UvStreamHandle handle, int status, object state) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs index 5a7a1715d7..3cdbba59c8 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -5,7 +5,6 @@ using System; using System.IO; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Tasks; using Microsoft.AspNet.Server.Kestrel.Infrastructure; namespace Microsoft.AspNet.Server.Kestrel.Http @@ -25,7 +24,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private MemoryPoolBlock2 _head; private MemoryPoolBlock2 _tail; private MemoryPoolBlock2 _pinned; - private readonly object _sync = new Object(); public SocketInput(MemoryPool2 memory, IThreadPool threadPool) { @@ -34,154 +32,99 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _awaitableState = _awaitableIsNotCompleted; } - public ArraySegment Buffer { get; set; } - public bool RemoteIntakeFin { get; set; } - public bool IsCompleted + public bool IsCompleted => (_awaitableState == _awaitableIsCompleted); + + public MemoryPoolBlock2 IncomingStart() { - get + const int minimumSize = 2048; + + if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End) { - return Equals(_awaitableState, _awaitableIsCompleted); + _pinned = _tail; } - } - - public void Skip(int count) - { - Buffer = new ArraySegment(Buffer.Array, Buffer.Offset + count, Buffer.Count - count); - } - - public ArraySegment Take(int count) - { - var taken = new ArraySegment(Buffer.Array, Buffer.Offset, count); - Skip(count); - return taken; - } - - public IncomingBuffer IncomingStart(int minimumSize) - { - lock (_sync) + else { - if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End) + _pinned = _memory.Lease(); + } + + return _pinned; + } + + public void IncomingData(byte[] buffer, int offset, int count) + { + if (count > 0) + { + if (_tail == null) { - _pinned = _tail; - var data = new ArraySegment(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End); - var dataPtr = _pinned.Pin() + _pinned.End; - return new IncomingBuffer - { - Data = data, - DataPtr = dataPtr, - }; + _tail = _memory.Lease(); } + + var iterator = new MemoryPoolIterator2(_tail, _tail.End); + iterator.CopyFrom(buffer, offset, count); + + if (_head == null) + { + _head = _tail; + } + + _tail = iterator.Block; + } + else + { + RemoteIntakeFin = true; } - _pinned = _memory.Lease(minimumSize); - return new IncomingBuffer - { - Data = _pinned.Data, - DataPtr = _pinned.Pin() + _pinned.End - }; + Complete(); } public void IncomingComplete(int count, Exception error) { - Action awaitableState; - - lock (_sync) + // Unpin may called without an earlier Pin + if (_pinned != null) { - // Unpin may called without an earlier Pin - if (_pinned != null) - { - _pinned.Unpin(); - _pinned.End += count; - if (_head == null) - { - _head = _tail = _pinned; - } - else if (_tail == _pinned) - { - // NO-OP: this was a read into unoccupied tail-space - } - else - { - _tail.Next = _pinned; - _tail = _pinned; - } + _pinned.End += count; + + if (_head == null) + { + _head = _tail = _pinned; } + else if (_tail == _pinned) + { + // NO-OP: this was a read into unoccupied tail-space + } + else + { + _tail.Next = _pinned; + _tail = _pinned; + } + _pinned = null; - - if (count == 0) - { - RemoteIntakeFin = true; - } - if (error != null) - { - _awaitableError = error; - } - - awaitableState = Interlocked.Exchange( - ref _awaitableState, - _awaitableIsCompleted); - - _manualResetEvent.Set(); } - if (awaitableState != _awaitableIsCompleted && - awaitableState != _awaitableIsNotCompleted) + if (count == 0) { - _threadPool.Run(awaitableState); + RemoteIntakeFin = true; } - } - - public MemoryPoolIterator2 ConsumingStart() - { - lock (_sync) + if (error != null) { - return new MemoryPoolIterator2(_head); + _awaitableError = error; } - } - public void ConsumingComplete( - MemoryPoolIterator2 consumed, - MemoryPoolIterator2 examined) - { - MemoryPoolBlock2 returnStart = null; - MemoryPoolBlock2 returnEnd = null; - lock (_sync) - { - if (!consumed.IsDefault) - { - returnStart = _head; - returnEnd = consumed.Block; - _head = consumed.Block; - _head.Start = consumed.Index; - } - if (!examined.IsDefault && - examined.IsEnd && - RemoteIntakeFin == false && - _awaitableError == null) - { - _manualResetEvent.Reset(); - - var awaitableState = Interlocked.CompareExchange( - ref _awaitableState, - _awaitableIsNotCompleted, - _awaitableIsCompleted); - } - } - while (returnStart != returnEnd) - { - var returnBlock = returnStart; - returnStart = returnStart.Next; - returnBlock.Pool?.Return(returnBlock); - } + Complete(); } public void AbortAwaiting() { _awaitableError = new ObjectDisposedException(nameof(SocketInput), "The request was aborted"); + Complete(); + } + + private void Complete() + { var awaitableState = Interlocked.Exchange( ref _awaitableState, _awaitableIsCompleted); @@ -195,6 +138,45 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } + public MemoryPoolIterator2 ConsumingStart() + { + return new MemoryPoolIterator2(_head); + } + + public void ConsumingComplete( + MemoryPoolIterator2 consumed, + MemoryPoolIterator2 examined) + { + MemoryPoolBlock2 returnStart = null; + MemoryPoolBlock2 returnEnd = null; + if (!consumed.IsDefault) + { + returnStart = _head; + returnEnd = consumed.Block; + _head = consumed.Block; + _head.Start = consumed.Index; + } + if (!examined.IsDefault && + examined.IsEnd && + RemoteIntakeFin == false && + _awaitableError == null) + { + _manualResetEvent.Reset(); + + var awaitableState = Interlocked.CompareExchange( + ref _awaitableState, + _awaitableIsNotCompleted, + _awaitableIsCompleted); + } + + while (returnStart != returnEnd) + { + var returnBlock = returnStart; + returnStart = returnStart.Next; + returnBlock.Pool.Return(returnBlock); + } + } + public SocketInput GetAwaiter() { return this; @@ -247,11 +229,5 @@ namespace Microsoft.AspNet.Server.Kestrel.Http throw new IOException(error.Message, error); } } - - public struct IncomingBuffer - { - public ArraySegment Data; - public IntPtr DataPtr; - } } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs index cc9bc26618..793084ef55 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs @@ -55,9 +55,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var headerCollection = new FrameRequestHeaders(); var headerArray = Encoding.ASCII.GetBytes(rawHeaders); - var inputBuffer = socketInput.IncomingStart(headerArray.Length); - Buffer.BlockCopy(headerArray, 0, inputBuffer.Data.Array, inputBuffer.Data.Offset, headerArray.Length); - socketInput.IncomingComplete(headerArray.Length, null); + socketInput.IncomingData(headerArray, 0, headerArray.Length); var success = Frame.TakeMessageHeaders(socketInput, headerCollection); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs index 827d47ba84..14c3213903 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs @@ -29,11 +29,8 @@ namespace Microsoft.AspNet.Server.KestrelTests public void Add(string text, bool fin = false) { - var encoding = System.Text.Encoding.ASCII; - var count = encoding.GetByteCount(text); - var buffer = FrameContext.SocketInput.IncomingStart(text.Length); - count = encoding.GetBytes(text, 0, text.Length, buffer.Data.Array, buffer.Data.Offset); - FrameContext.SocketInput.IncomingComplete(count, null); + var data = System.Text.Encoding.ASCII.GetBytes(text); + FrameContext.SocketInput.IncomingData(data, 0, data.Length); if (fin) { FrameContext.SocketInput.RemoteIntakeFin = true;