Simplify SocketInput, remove locks, only use pooled blocks

This commit is contained in:
Ben Adams 2015-12-23 07:41:34 +00:00
parent 4f8ec86b54
commit afe944c053
5 changed files with 109 additions and 142 deletions

View File

@ -74,11 +74,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
public override void Write(byte[] buffer, int offset, int count)
{
var inputBuffer = _socketInput.IncomingStart(count);
Buffer.BlockCopy(buffer, offset, inputBuffer.Data.Array, inputBuffer.Data.Offset, count);
_socketInput.IncomingComplete(count, error: null);
_socketInput.IncomingData(buffer, offset, count);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
@ -90,7 +86,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
protected override void Dispose(bool disposing)
{
// Close _socketInput with a fake zero-length write that will result in a zero-length read.
_socketInput.IncomingComplete(0, error: null);
_socketInput.IncomingData(null, 0, 0);
base.Dispose(disposing);
}
}

View File

@ -140,11 +140,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
var result = _rawSocketInput.IncomingStart(2048);
var result = _rawSocketInput.IncomingStart();
return handle.Libuv.buf_init(
result.DataPtr,
result.Data.Count);
result.Pin() + result.End,
result.Data.Offset + result.Data.Count - result.End);
}
private static void ReadCallback(UvStreamHandle handle, int status, object state)

View File

@ -5,7 +5,6 @@ using System;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
namespace Microsoft.AspNet.Server.Kestrel.Http
@ -25,7 +24,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private MemoryPoolBlock2 _head;
private MemoryPoolBlock2 _tail;
private MemoryPoolBlock2 _pinned;
private readonly object _sync = new Object();
public SocketInput(MemoryPool2 memory, IThreadPool threadPool)
{
@ -34,154 +32,99 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_awaitableState = _awaitableIsNotCompleted;
}
public ArraySegment<byte> Buffer { get; set; }
public bool RemoteIntakeFin { get; set; }
public bool IsCompleted
public bool IsCompleted => (_awaitableState == _awaitableIsCompleted);
public MemoryPoolBlock2 IncomingStart()
{
get
const int minimumSize = 2048;
if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
{
return Equals(_awaitableState, _awaitableIsCompleted);
_pinned = _tail;
}
}
public void Skip(int count)
{
Buffer = new ArraySegment<byte>(Buffer.Array, Buffer.Offset + count, Buffer.Count - count);
}
public ArraySegment<byte> Take(int count)
{
var taken = new ArraySegment<byte>(Buffer.Array, Buffer.Offset, count);
Skip(count);
return taken;
}
public IncomingBuffer IncomingStart(int minimumSize)
{
lock (_sync)
else
{
if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
_pinned = _memory.Lease();
}
return _pinned;
}
public void IncomingData(byte[] buffer, int offset, int count)
{
if (count > 0)
{
if (_tail == null)
{
_pinned = _tail;
var data = new ArraySegment<byte>(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End);
var dataPtr = _pinned.Pin() + _pinned.End;
return new IncomingBuffer
{
Data = data,
DataPtr = dataPtr,
};
_tail = _memory.Lease();
}
var iterator = new MemoryPoolIterator2(_tail, _tail.End);
iterator.CopyFrom(buffer, offset, count);
if (_head == null)
{
_head = _tail;
}
_tail = iterator.Block;
}
else
{
RemoteIntakeFin = true;
}
_pinned = _memory.Lease(minimumSize);
return new IncomingBuffer
{
Data = _pinned.Data,
DataPtr = _pinned.Pin() + _pinned.End
};
Complete();
}
public void IncomingComplete(int count, Exception error)
{
Action awaitableState;
lock (_sync)
// Unpin may called without an earlier Pin
if (_pinned != null)
{
// Unpin may called without an earlier Pin
if (_pinned != null)
{
_pinned.Unpin();
_pinned.End += count;
if (_head == null)
{
_head = _tail = _pinned;
}
else if (_tail == _pinned)
{
// NO-OP: this was a read into unoccupied tail-space
}
else
{
_tail.Next = _pinned;
_tail = _pinned;
}
_pinned.End += count;
if (_head == null)
{
_head = _tail = _pinned;
}
else if (_tail == _pinned)
{
// NO-OP: this was a read into unoccupied tail-space
}
else
{
_tail.Next = _pinned;
_tail = _pinned;
}
_pinned = null;
if (count == 0)
{
RemoteIntakeFin = true;
}
if (error != null)
{
_awaitableError = error;
}
awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);
_manualResetEvent.Set();
}
if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
if (count == 0)
{
_threadPool.Run(awaitableState);
RemoteIntakeFin = true;
}
}
public MemoryPoolIterator2 ConsumingStart()
{
lock (_sync)
if (error != null)
{
return new MemoryPoolIterator2(_head);
_awaitableError = error;
}
}
public void ConsumingComplete(
MemoryPoolIterator2 consumed,
MemoryPoolIterator2 examined)
{
MemoryPoolBlock2 returnStart = null;
MemoryPoolBlock2 returnEnd = null;
lock (_sync)
{
if (!consumed.IsDefault)
{
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}
}
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool?.Return(returnBlock);
}
Complete();
}
public void AbortAwaiting()
{
_awaitableError = new ObjectDisposedException(nameof(SocketInput), "The request was aborted");
Complete();
}
private void Complete()
{
var awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);
@ -195,6 +138,45 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
public MemoryPoolIterator2 ConsumingStart()
{
return new MemoryPoolIterator2(_head);
}
public void ConsumingComplete(
MemoryPoolIterator2 consumed,
MemoryPoolIterator2 examined)
{
MemoryPoolBlock2 returnStart = null;
MemoryPoolBlock2 returnEnd = null;
if (!consumed.IsDefault)
{
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool.Return(returnBlock);
}
}
public SocketInput GetAwaiter()
{
return this;
@ -247,11 +229,5 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
throw new IOException(error.Message, error);
}
}
public struct IncomingBuffer
{
public ArraySegment<byte> Data;
public IntPtr DataPtr;
}
}
}

View File

@ -55,9 +55,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var headerCollection = new FrameRequestHeaders();
var headerArray = Encoding.ASCII.GetBytes(rawHeaders);
var inputBuffer = socketInput.IncomingStart(headerArray.Length);
Buffer.BlockCopy(headerArray, 0, inputBuffer.Data.Array, inputBuffer.Data.Offset, headerArray.Length);
socketInput.IncomingComplete(headerArray.Length, null);
socketInput.IncomingData(headerArray, 0, headerArray.Length);
var success = Frame.TakeMessageHeaders(socketInput, headerCollection);

View File

@ -29,11 +29,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
public void Add(string text, bool fin = false)
{
var encoding = System.Text.Encoding.ASCII;
var count = encoding.GetByteCount(text);
var buffer = FrameContext.SocketInput.IncomingStart(text.Length);
count = encoding.GetBytes(text, 0, text.Length, buffer.Data.Array, buffer.Data.Offset);
FrameContext.SocketInput.IncomingComplete(count, null);
var data = System.Text.Encoding.ASCII.GetBytes(text);
FrameContext.SocketInput.IncomingData(data, 0, data.Length);
if (fin)
{
FrameContext.SocketInput.RemoteIntakeFin = true;