Use TryEnter to acquire SocketOutput._contextLock on the libuv event loop
This commit is contained in:
parent
e90b61e6c5
commit
df695accb0
|
|
@ -9,6 +9,7 @@ using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
|
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
|
||||||
using Microsoft.AspNet.Server.Kestrel.Networking;
|
using Microsoft.AspNet.Server.Kestrel.Networking;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Microsoft.AspNet.Server.Kestrel.Http
|
namespace Microsoft.AspNet.Server.Kestrel.Http
|
||||||
{
|
{
|
||||||
|
|
@ -278,7 +279,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is called on the libuv event loop
|
// This may called on the libuv event loop
|
||||||
|
// This is always called with the _contextLock already acquired
|
||||||
private void OnWriteCompleted(WriteContext writeContext)
|
private void OnWriteCompleted(WriteContext writeContext)
|
||||||
{
|
{
|
||||||
var bytesWritten = writeContext.ByteCount;
|
var bytesWritten = writeContext.ByteCount;
|
||||||
|
|
@ -294,28 +296,25 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
||||||
_connection.Abort();
|
_connection.Abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
lock (_contextLock)
|
PoolWriteContext(writeContext);
|
||||||
|
|
||||||
|
// _numBytesPreCompleted can temporarily go negative in the event there are
|
||||||
|
// completed writes that we haven't triggered callbacks for yet.
|
||||||
|
_numBytesPreCompleted -= bytesWritten;
|
||||||
|
|
||||||
|
// bytesLeftToBuffer can be greater than _maxBytesPreCompleted
|
||||||
|
// This allows large writes to complete once they've actually finished.
|
||||||
|
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
|
||||||
|
while (_tasksPending.Count > 0 &&
|
||||||
|
(int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer)
|
||||||
{
|
{
|
||||||
PoolWriteContext(writeContext);
|
var tcs = _tasksPending.Dequeue();
|
||||||
|
var bytesToWrite = (int)tcs.Task.AsyncState;
|
||||||
|
|
||||||
// _numBytesPreCompleted can temporarily go negative in the event there are
|
_numBytesPreCompleted += bytesToWrite;
|
||||||
// completed writes that we haven't triggered callbacks for yet.
|
bytesLeftToBuffer -= bytesToWrite;
|
||||||
_numBytesPreCompleted -= bytesWritten;
|
|
||||||
|
|
||||||
// bytesLeftToBuffer can be greater than _maxBytesPreCompleted
|
_tasksCompleted.Enqueue(tcs);
|
||||||
// This allows large writes to complete once they've actually finished.
|
|
||||||
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
|
|
||||||
while (_tasksPending.Count > 0 &&
|
|
||||||
(int)(_tasksPending.Peek().Task.AsyncState) <= bytesLeftToBuffer)
|
|
||||||
{
|
|
||||||
var tcs = _tasksPending.Dequeue();
|
|
||||||
var bytesToWrite = (int)tcs.Task.AsyncState;
|
|
||||||
|
|
||||||
_numBytesPreCompleted += bytesToWrite;
|
|
||||||
bytesLeftToBuffer -= bytesToWrite;
|
|
||||||
|
|
||||||
_tasksCompleted.Enqueue(tcs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (_tasksCompleted.Count > 0)
|
while (_tasksCompleted.Count > 0)
|
||||||
|
|
@ -414,6 +413,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
||||||
private class WriteContext
|
private class WriteContext
|
||||||
{
|
{
|
||||||
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
|
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
|
||||||
|
private static WaitCallback _completeWrite = (state) => ((WriteContext)state).CompleteOnThreadPool();
|
||||||
|
|
||||||
private SocketOutput Self;
|
private SocketOutput Self;
|
||||||
private UvWriteReq _writeReq;
|
private UvWriteReq _writeReq;
|
||||||
|
|
@ -509,19 +509,48 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
|
||||||
{
|
{
|
||||||
if (SocketDisconnect == false || Self._socket.IsClosed)
|
if (SocketDisconnect == false || Self._socket.IsClosed)
|
||||||
{
|
{
|
||||||
Complete();
|
CompleteOnUvThread();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Self._socket.Dispose();
|
Self._socket.Dispose();
|
||||||
Self.ReturnAllBlocks();
|
Self.ReturnAllBlocks();
|
||||||
Self._log.ConnectionStop(Self._connectionId);
|
Self._log.ConnectionStop(Self._connectionId);
|
||||||
Complete();
|
CompleteOnUvThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Complete()
|
public void CompleteOnUvThread()
|
||||||
{
|
{
|
||||||
Self.OnWriteCompleted(this);
|
if (Monitor.TryEnter(Self._contextLock))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Self.OnWriteCompleted(this);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
Monitor.Exit(Self._contextLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ThreadPool.QueueUserWorkItem(_completeWrite, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CompleteOnThreadPool()
|
||||||
|
{
|
||||||
|
lock (Self._contextLock)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Self.OnWriteCompleted(this);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Self._log.LogError("SocketOutput.OnWriteCompleted", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void PoolWriteReq(UvWriteReq writeReq)
|
private void PoolWriteReq(UvWriteReq writeReq)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue