Lock-free IOQueue (#6154)

Second half of https://github.com/aspnet/AspNetCore/pull/4060
This commit is contained in:
Ben Adams 2019-02-16 02:26:30 +00:00 committed by Stephen Halter
parent 394ef0ab93
commit 639d290b19
3 changed files with 151 additions and 21 deletions

View File

@ -10,23 +10,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
public class IOQueue : PipeScheduler, IThreadPoolWorkItem
{
private readonly object _workSync = new object();
private readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
private bool _doingWork;
private int _doingWork;
public override void Schedule(Action<object> action, object state)
{
var work = new Work(action, state);
_workItems.Enqueue(new Work(action, state));
_workItems.Enqueue(work);
lock (_workSync)
// Set working if it wasn't (via atomic Interlocked).
if (Interlocked.CompareExchange(ref _doingWork, 1, 0) == 0)
{
if (!_doingWork)
{
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
_doingWork = true;
}
// Wasn't working, schedule.
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}
@ -39,14 +34,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
item.Callback(item.State);
}
lock (_workSync)
// All work done.
// Set _doingWork (0 == false) prior to checking IsEmpty to catch any missed work in interim.
// This doesn't need to be volatile due to the following barrier (i.e. it is volatile).
_doingWork = 0;
// Ensure _doingWork is written before IsEmpty is read.
// As they are two different memory locations, we insert a barrier to guarantee ordering.
Thread.MemoryBarrier();
// Check if there is work to do
if (_workItems.IsEmpty)
{
if (_workItems.IsEmpty)
{
_doingWork = false;
return;
}
// Nothing to do, exit.
break;
}
// Is work, can we set it as active again (via atomic Interlocked), prior to scheduling?
if (Interlocked.Exchange(ref _doingWork, 1) == 1)
{
// Execute has been rescheduled already, exit.
break;
}
// Is work, wasn't already scheduled so continue loop.
}
}

View File

@ -19,8 +19,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
internal sealed class SocketTransport : ITransport
{
private static readonly PipeScheduler[] ThreadPoolSchedulerArray = new PipeScheduler[] { PipeScheduler.ThreadPool };
private readonly MemoryPool<byte> _memoryPool;
private readonly IEndPointInformation _endPointInformation;
private readonly IConnectionDispatcher _dispatcher;
@ -65,8 +63,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
}
else
{
_numSchedulers = ThreadPoolSchedulerArray.Length;
_schedulers = ThreadPoolSchedulerArray;
var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool };
_numSchedulers = directScheduler.Length;
_schedulers = directScheduler;
}
}

View File

@ -0,0 +1,119 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Performance
{
public class SchedulerBenchmark
{
private const int InnerLoopCount = 1024;
private const int OuterLoopCount = 64;
private const int OperationsPerInvoke = InnerLoopCount * OuterLoopCount;
private readonly static int IOQueueCount = Math.Min(Environment.ProcessorCount, 16);
private PipeScheduler[] _ioQueueSchedulers;
private PipeScheduler[] _threadPoolSchedulers;
private PipeScheduler[] _inlineSchedulers;
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _totalToReport;
private PaddedInteger[] _counters = new PaddedInteger[OuterLoopCount];
private Func<int, ParallelLoopState, PipeScheduler[], PipeScheduler[]> _parallelAction;
private Action<object> _action;
[GlobalSetup]
public void Setup()
{
_parallelAction = ParallelBody;
_action = new Action<object>(ScheduledAction);
_inlineSchedulers = new PipeScheduler[IOQueueCount];
for (var i = 0; i < _inlineSchedulers.Length; i++)
{
_inlineSchedulers[i] = PipeScheduler.Inline;
}
_threadPoolSchedulers = new PipeScheduler[IOQueueCount];
for (var i = 0; i < _threadPoolSchedulers.Length; i++)
{
_threadPoolSchedulers[i] = PipeScheduler.ThreadPool;
}
_ioQueueSchedulers = new PipeScheduler[IOQueueCount];
for (var i = 0; i < _ioQueueSchedulers.Length; i++)
{
_ioQueueSchedulers[i] = new IOQueue();
}
}
[IterationSetup]
public void IterationSetup()
{
_totalToReport = OuterLoopCount;
for (var i = 0; i < _counters.Length; i++)
{
_counters[i].Remaining = InnerLoopCount;
}
}
[Benchmark(OperationsPerInvoke = OperationsPerInvoke, Baseline = true)]
public void ThreadPoolScheduler() => Schedule(_threadPoolSchedulers);
[Benchmark(OperationsPerInvoke = OperationsPerInvoke)]
public void IOQueueScheduler() => Schedule(_ioQueueSchedulers);
[Benchmark(OperationsPerInvoke = OperationsPerInvoke)]
public void InlineScheduler() => Schedule(_inlineSchedulers);
private void Schedule(PipeScheduler[] schedulers)
{
Parallel.For(0, OuterLoopCount, () => schedulers, _parallelAction, (s) => { });
while (_totalToReport > 0)
{
_semaphore.Wait();
_totalToReport--;
}
}
private void ScheduledAction(object o)
{
var counter = (int)o;
var result = Interlocked.Decrement(ref _counters[counter].Remaining);
if (result == 0)
{
_semaphore.Release();
}
}
private PipeScheduler[] ParallelBody(int i, ParallelLoopState state, PipeScheduler[] schedulers)
{
PipeScheduler pipeScheduler = schedulers[i % schedulers.Length];
object counter = i;
for (var t = 0; t < InnerLoopCount; t++)
{
pipeScheduler.Schedule(_action, counter);
}
return schedulers;
}
[StructLayout(LayoutKind.Explicit, Size = 128)]
private struct PaddedInteger
{
// Padded to avoid false sharing
[FieldOffset(64)]
public int Remaining;
}
}
}