diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/IOQueue.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/IOQueue.cs index b187ad7e30..7a6da45db8 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/IOQueue.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/IOQueue.cs @@ -10,23 +10,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { public class IOQueue : PipeScheduler, IThreadPoolWorkItem { - private readonly object _workSync = new object(); private readonly ConcurrentQueue _workItems = new ConcurrentQueue(); - private bool _doingWork; + private int _doingWork; public override void Schedule(Action action, object state) { - var work = new Work(action, state); + _workItems.Enqueue(new Work(action, state)); - _workItems.Enqueue(work); - - lock (_workSync) + // Set working if it wasn't (via atomic Interlocked). + if (Interlocked.CompareExchange(ref _doingWork, 1, 0) == 0) { - if (!_doingWork) - { - System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); - _doingWork = true; - } + // Wasn't working, schedule. + System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } @@ -39,14 +34,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal item.Callback(item.State); } - lock (_workSync) + // All work done. + + // Set _doingWork (0 == false) prior to checking IsEmpty to catch any missed work in interim. + // This doesn't need to be volatile due to the following barrier (i.e. it is volatile). + _doingWork = 0; + + // Ensure _doingWork is written before IsEmpty is read. + // As they are two different memory locations, we insert a barrier to guarantee ordering. + Thread.MemoryBarrier(); + + // Check if there is work to do + if (_workItems.IsEmpty) { - if (_workItems.IsEmpty) - { - _doingWork = false; - return; - } + // Nothing to do, exit. + break; } + + // Is work, can we set it as active again (via atomic Interlocked), prior to scheduling? + if (Interlocked.Exchange(ref _doingWork, 1) == 1) + { + // Execute has been rescheduled already, exit. + break; + } + + // Is work, wasn't already scheduled so continue loop. } } diff --git a/src/Servers/Kestrel/Transport.Sockets/src/SocketTransport.cs b/src/Servers/Kestrel/Transport.Sockets/src/SocketTransport.cs index 82599f20b7..3e40d1ce8b 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/SocketTransport.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/SocketTransport.cs @@ -19,8 +19,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { internal sealed class SocketTransport : ITransport { - private static readonly PipeScheduler[] ThreadPoolSchedulerArray = new PipeScheduler[] { PipeScheduler.ThreadPool }; - private readonly MemoryPool _memoryPool; private readonly IEndPointInformation _endPointInformation; private readonly IConnectionDispatcher _dispatcher; @@ -65,8 +63,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets } else { - _numSchedulers = ThreadPoolSchedulerArray.Length; - _schedulers = ThreadPoolSchedulerArray; + var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool }; + _numSchedulers = directScheduler.Length; + _schedulers = directScheduler; } } diff --git a/src/Servers/Kestrel/perf/Kestrel.Performance/SchedulerBenchmark.cs b/src/Servers/Kestrel/perf/Kestrel.Performance/SchedulerBenchmark.cs new file mode 100644 index 0000000000..df109a172e --- /dev/null +++ b/src/Servers/Kestrel/perf/Kestrel.Performance/SchedulerBenchmark.cs @@ -0,0 +1,119 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal; + +namespace Microsoft.AspNetCore.Server.Kestrel.Performance +{ + public class SchedulerBenchmark + { + private const int InnerLoopCount = 1024; + private const int OuterLoopCount = 64; + private const int OperationsPerInvoke = InnerLoopCount * OuterLoopCount; + + private readonly static int IOQueueCount = Math.Min(Environment.ProcessorCount, 16); + + private PipeScheduler[] _ioQueueSchedulers; + private PipeScheduler[] _threadPoolSchedulers; + private PipeScheduler[] _inlineSchedulers; + + private SemaphoreSlim _semaphore = new SemaphoreSlim(0); + private int _totalToReport; + private PaddedInteger[] _counters = new PaddedInteger[OuterLoopCount]; + + private Func _parallelAction; + private Action _action; + + [GlobalSetup] + public void Setup() + { + _parallelAction = ParallelBody; + _action = new Action(ScheduledAction); + + _inlineSchedulers = new PipeScheduler[IOQueueCount]; + for (var i = 0; i < _inlineSchedulers.Length; i++) + { + _inlineSchedulers[i] = PipeScheduler.Inline; + } + + _threadPoolSchedulers = new PipeScheduler[IOQueueCount]; + for (var i = 0; i < _threadPoolSchedulers.Length; i++) + { + _threadPoolSchedulers[i] = PipeScheduler.ThreadPool; + } + + _ioQueueSchedulers = new PipeScheduler[IOQueueCount]; + for (var i = 0; i < _ioQueueSchedulers.Length; i++) + { + _ioQueueSchedulers[i] = new IOQueue(); + } + } + + [IterationSetup] + public void IterationSetup() + { + _totalToReport = OuterLoopCount; + + for (var i = 0; i < _counters.Length; i++) + { + _counters[i].Remaining = InnerLoopCount; + } + } + + [Benchmark(OperationsPerInvoke = OperationsPerInvoke, Baseline = true)] + public void ThreadPoolScheduler() => Schedule(_threadPoolSchedulers); + + [Benchmark(OperationsPerInvoke = OperationsPerInvoke)] + public void IOQueueScheduler() => Schedule(_ioQueueSchedulers); + + [Benchmark(OperationsPerInvoke = OperationsPerInvoke)] + public void InlineScheduler() => Schedule(_inlineSchedulers); + + private void Schedule(PipeScheduler[] schedulers) + { + Parallel.For(0, OuterLoopCount, () => schedulers, _parallelAction, (s) => { }); + + while (_totalToReport > 0) + { + _semaphore.Wait(); + _totalToReport--; + } + } + + private void ScheduledAction(object o) + { + var counter = (int)o; + var result = Interlocked.Decrement(ref _counters[counter].Remaining); + if (result == 0) + { + _semaphore.Release(); + } + } + + private PipeScheduler[] ParallelBody(int i, ParallelLoopState state, PipeScheduler[] schedulers) + { + PipeScheduler pipeScheduler = schedulers[i % schedulers.Length]; + object counter = i; + for (var t = 0; t < InnerLoopCount; t++) + { + pipeScheduler.Schedule(_action, counter); + } + + return schedulers; + } + + [StructLayout(LayoutKind.Explicit, Size = 128)] + private struct PaddedInteger + { + // Padded to avoid false sharing + [FieldOffset(64)] + public int Remaining; + } + } +}