diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs index 5db2fb3ebc..894da621aa 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs @@ -16,6 +16,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http { public const int MaxPooledWriteReqs = 1024; + private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; private const int _maxPooledWriteContexts = 32; @@ -44,7 +45,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http // The number of write operations that have been scheduled so far // but have not completed. - private bool _writePending = false; + private int _ongoingWrites = 0; + // Whether or not a write operation is pending to start on the uv thread. + // If this is true, there is no reason to schedule another write even if + // there aren't yet three ongoing write operations. + private bool _postingWrite = false; + private bool _cancelled = false; private int _numBytesPreCompleted = 0; private Exception _lastWriteError; @@ -185,9 +191,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http } } - if (!_writePending) + if (!_postingWrite && _ongoingWrites < _maxPendingWrites) { - _writePending = true; + _postingWrite = true; + _ongoingWrites++; scheduleWrite = true; } } @@ -325,13 +332,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http if (Monitor.TryEnter(_contextLock)) { - _writePending = false; + _postingWrite = false; if (_nextWriteContext != null) { writingContext = _nextWriteContext; _nextWriteContext = null; } + else + { + _ongoingWrites--; + } Monitor.Exit(_contextLock); } @@ -379,6 +390,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http CompleteAllWrites(); _log.ConnectionError(_connectionId, error); } + + if (!_postingWrite && _nextWriteContext != null) + { + _postingWrite = true; + ScheduleWrite(); + } + else + { + _ongoingWrites--; + } } private void CompleteNextWrite(ref int bytesLeftToBuffer) diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs index 853b22f577..ccab2eafc3 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/SocketOutputTests.cs @@ -544,5 +544,122 @@ namespace Microsoft.AspNetCore.Server.KestrelTests default(ArraySegment), default(CancellationToken), socketDisconnect: true); } } + + [Fact] + public void OnlyAllowsUpToThreeConcurrentWrites() + { + var writeWh = new ManualResetEventSlim(); + var completeQueue = new Queue>(); + + var mockLibuv = new MockLibuv + { + OnWrite = (socket, buffers, triggerCompleted) => + { + writeWh.Set(); + completeQueue.Enqueue(triggerCompleted); + return 0; + } + }; + + using (var memory = new MemoryPool()) + using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + { + kestrelEngine.Start(count: 1); + + var kestrelThread = kestrelEngine.Threads[0]; + var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); + var trace = new KestrelTrace(new TestKestrelTrace()); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue()); + + var buffer = new ArraySegment(new byte[1]); + + // First three writes trigger uv_write + socketOutput.WriteAsync(buffer, CancellationToken.None); + Assert.True(writeWh.Wait(1000)); + writeWh.Reset(); + socketOutput.WriteAsync(buffer, CancellationToken.None); + Assert.True(writeWh.Wait(1000)); + writeWh.Reset(); + socketOutput.WriteAsync(buffer, CancellationToken.None); + Assert.True(writeWh.Wait(1000)); + writeWh.Reset(); + + // The fourth write won't trigger uv_write since the first three haven't completed + socketOutput.WriteAsync(buffer, CancellationToken.None); + Assert.False(writeWh.Wait(1000)); + + // Complete 1st write allowing uv_write to be triggered again + completeQueue.Dequeue()(0); + Assert.True(writeWh.Wait(1000)); + + // Cleanup + var cleanupTask = socketOutput.WriteAsync( + default(ArraySegment), default(CancellationToken), socketDisconnect: true); + + foreach (var triggerCompleted in completeQueue) + { + triggerCompleted(0); + } + } + } + + [Fact] + public void WritesAreAggregated() + { + var writeWh = new ManualResetEventSlim(); + var writeCount = 0; + + var mockLibuv = new MockLibuv + { + OnWrite = (socket, buffers, triggerCompleted) => + { + writeCount++; + triggerCompleted(0); + writeWh.Set(); + return 0; + } + }; + + using (var memory = new MemoryPool()) + using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext())) + { + kestrelEngine.Start(count: 1); + + var kestrelThread = kestrelEngine.Threads[0]; + var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace()); + var trace = new KestrelTrace(new TestKestrelTrace()); + var ltp = new LoggingThreadPool(trace); + var socketOutput = new SocketOutput(kestrelThread, socket, memory, new MockConnection(), "0", trace, ltp, new Queue()); + + var blockThreadWh = new ManualResetEventSlim(); + kestrelThread.Post(_ => + { + blockThreadWh.Wait(); + }, state: null); + + var buffer = new ArraySegment(new byte[1]); + + // Two calls to WriteAsync trigger uv_write once if both calls + // are made before write is scheduled + socketOutput.WriteAsync(buffer, CancellationToken.None); + socketOutput.WriteAsync(buffer, CancellationToken.None); + + blockThreadWh.Set(); + + Assert.True(writeWh.Wait(1000)); + writeWh.Reset(); + + // Write isn't called twice after the thread is unblocked + Assert.False(writeWh.Wait(1000)); + Assert.Equal(1, writeCount); + // One call to ScheduleWrite + One call to Post to block the thread + Assert.Equal(2, mockLibuv.PostCount); + + // Cleanup + var cleanupTask = socketOutput.WriteAsync( + default(ArraySegment), default(CancellationToken), socketDisconnect: true); + } + } } } diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/MockLibuv.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/MockLibuv.cs index 745a3eca11..80b4befc3f 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/MockLibuv.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/MockLibuv.cs @@ -27,6 +27,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers _uv_async_send = postHandle => { + PostCount++; _loopWh.Set(); return 0; @@ -95,6 +96,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers public uv_read_cb ReadCallback { get; set; } + public int PostCount { get; set; } + private int UvReadStart(UvStreamHandle handle, uv_alloc_cb allocCallback, uv_read_cb readCallback) { AllocCallback = allocCallback;