Allow a maximum of 3 concurrent uv_write operations per connection

- Keep logic to prevent unnecessary calls to KestrelThread.Post
- This partially reverts commit 480996433e.
This commit is contained in:
Stephen Halter 2016-05-31 17:20:04 -07:00
parent 0342754c57
commit 2d229e8980
3 changed files with 145 additions and 4 deletions

View File

@ -16,6 +16,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
public const int MaxPooledWriteReqs = 1024;
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;
@ -44,7 +45,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
// The number of write operations that have been scheduled so far
// but have not completed.
private bool _writePending = false;
private int _ongoingWrites = 0;
// Whether or not a write operation is pending to start on the uv thread.
// If this is true, there is no reason to schedule another write even if
// there aren't yet three ongoing write operations.
private bool _postingWrite = false;
private bool _cancelled = false;
private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
@ -185,9 +191,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
}
}
if (!_writePending)
if (!_postingWrite && _ongoingWrites < _maxPendingWrites)
{
_writePending = true;
_postingWrite = true;
_ongoingWrites++;
scheduleWrite = true;
}
}
@ -325,13 +332,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
if (Monitor.TryEnter(_contextLock))
{
_writePending = false;
_postingWrite = false;
if (_nextWriteContext != null)
{
writingContext = _nextWriteContext;
_nextWriteContext = null;
}
else
{
_ongoingWrites--;
}
Monitor.Exit(_contextLock);
}
@ -379,6 +390,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
CompleteAllWrites();
_log.ConnectionError(_connectionId, error);
}
if (!_postingWrite && _nextWriteContext != null)
{
_postingWrite = true;
ScheduleWrite();
}
else
{
_ongoingWrites--;
}
}
private void CompleteNextWrite(ref int bytesLeftToBuffer)

View File

@ -544,5 +544,122 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
}
}
[Fact]
public void OnlyAllowsUpToThreeConcurrentWrites()
{
var writeWh = new ManualResetEventSlim();
var completeQueue = new Queue<Action<int>>();
var mockLibuv = new MockLibuv
{
OnWrite = (socket, buffers, triggerCompleted) =>
{
writeWh.Set();
completeQueue.Enqueue(triggerCompleted);
return 0;
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var buffer = new ArraySegment<byte>(new byte[1]);
// First three writes trigger uv_write
socketOutput.WriteAsync(buffer, CancellationToken.None);
Assert.True(writeWh.Wait(1000));
writeWh.Reset();
socketOutput.WriteAsync(buffer, CancellationToken.None);
Assert.True(writeWh.Wait(1000));
writeWh.Reset();
socketOutput.WriteAsync(buffer, CancellationToken.None);
Assert.True(writeWh.Wait(1000));
writeWh.Reset();
// The fourth write won't trigger uv_write since the first three haven't completed
socketOutput.WriteAsync(buffer, CancellationToken.None);
Assert.False(writeWh.Wait(1000));
// Complete 1st write allowing uv_write to be triggered again
completeQueue.Dequeue()(0);
Assert.True(writeWh.Wait(1000));
// Cleanup
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
}
}
}
[Fact]
public void WritesAreAggregated()
{
var writeWh = new ManualResetEventSlim();
var writeCount = 0;
var mockLibuv = new MockLibuv
{
OnWrite = (socket, buffers, triggerCompleted) =>
{
writeCount++;
triggerCompleted(0);
writeWh.Set();
return 0;
}
};
using (var memory = new MemoryPool())
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
kestrelEngine.Start(count: 1);
var kestrelThread = kestrelEngine.Threads[0];
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new LoggingThreadPool(trace);
var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue<UvWriteReq>());
var blockThreadWh = new ManualResetEventSlim();
kestrelThread.Post(_ =>
{
blockThreadWh.Wait();
}, state: null);
var buffer = new ArraySegment<byte>(new byte[1]);
// Two calls to WriteAsync trigger uv_write once if both calls
// are made before write is scheduled
socketOutput.WriteAsync(buffer, CancellationToken.None);
socketOutput.WriteAsync(buffer, CancellationToken.None);
blockThreadWh.Set();
Assert.True(writeWh.Wait(1000));
writeWh.Reset();
// Write isn't called twice after the thread is unblocked
Assert.False(writeWh.Wait(1000));
Assert.Equal(1, writeCount);
// One call to ScheduleWrite + One call to Post to block the thread
Assert.Equal(2, mockLibuv.PostCount);
// Cleanup
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
}
}
}
}

View File

@ -27,6 +27,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
_uv_async_send = postHandle =>
{
PostCount++;
_loopWh.Set();
return 0;
@ -95,6 +96,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
public uv_read_cb ReadCallback { get; set; }
public int PostCount { get; set; }
private int UvReadStart(UvStreamHandle handle, uv_alloc_cb allocCallback, uv_read_cb readCallback)
{
AllocCallback = allocCallback;