Remove race condition from socket input that could stall reads

- Without the _sync lock, if new data was produced as ConsumingComplete
  was called, the next "await SocketInput" might never complete despite not
  all data being examined.
- If more data is produced afterward, the stall would be prevented, but this
  isn't always the case such as during the end of the request.
This commit is contained in:
Stephen Halter 2016-03-10 09:14:36 -08:00
parent 850632a091
commit 84a68208d0
1 changed files with 82 additions and 72 deletions

View File

@ -28,6 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
private MemoryPoolBlock _pinned;
private int _consumingState;
private object _sync = new object();
public SocketInput(MemoryPool memory, IThreadPool threadPool)
{
@ -58,64 +59,70 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
public void IncomingData(byte[] buffer, int offset, int count)
{
if (count > 0)
lock (_sync)
{
if (_tail == null)
if (count > 0)
{
_tail = _memory.Lease();
if (_tail == null)
{
_tail = _memory.Lease();
}
var iterator = new MemoryPoolIterator(_tail, _tail.End);
iterator.CopyFrom(buffer, offset, count);
if (_head == null)
{
_head = _tail;
}
_tail = iterator.Block;
}
else
{
RemoteIntakeFin = true;
}
var iterator = new MemoryPoolIterator(_tail, _tail.End);
iterator.CopyFrom(buffer, offset, count);
if (_head == null)
{
_head = _tail;
}
_tail = iterator.Block;
Complete();
}
else
{
RemoteIntakeFin = true;
}
Complete();
}
public void IncomingComplete(int count, Exception error)
{
if (_pinned != null)
lock (_sync)
{
_pinned.End += count;
if (_pinned != null)
{
_pinned.End += count;
if (_head == null)
{
_head = _tail = _pinned;
}
else if (_tail == _pinned)
{
// NO-OP: this was a read into unoccupied tail-space
}
else
{
_tail.Next = _pinned;
_tail = _pinned;
if (_head == null)
{
_head = _tail = _pinned;
}
else if (_tail == _pinned)
{
// NO-OP: this was a read into unoccupied tail-space
}
else
{
_tail.Next = _pinned;
_tail = _pinned;
}
_pinned = null;
}
_pinned = null;
}
if (count == 0)
{
RemoteIntakeFin = true;
}
if (error != null)
{
_awaitableError = error;
}
if (count == 0)
{
RemoteIntakeFin = true;
Complete();
}
if (error != null)
{
_awaitableError = error;
}
Complete();
}
public void IncomingDeferred()
@ -162,40 +169,43 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
MemoryPoolIterator consumed,
MemoryPoolIterator examined)
{
MemoryPoolBlock returnStart = null;
MemoryPoolBlock returnEnd = null;
if (!consumed.IsDefault)
lock (_sync)
{
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
MemoryPoolBlock returnStart = null;
MemoryPoolBlock returnEnd = null;
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();
if (!consumed.IsDefault)
{
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool.Return(returnBlock);
}
Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}
if (Interlocked.CompareExchange(ref _consumingState, 0, 1) != 1)
{
throw new InvalidOperationException("No ongoing consuming operation to complete.");
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool.Return(returnBlock);
}
if (Interlocked.CompareExchange(ref _consumingState, 0, 1) != 1)
{
throw new InvalidOperationException("No ongoing consuming operation to complete.");
}
}
}