Use LibuvThread inspired IO Queue in Socket transport (#2368)
This commit is contained in:
parent
f52771d0e9
commit
71bff00c0d
|
|
@ -3,7 +3,6 @@
|
|||
"editor.tabSize": 2
|
||||
},
|
||||
"files.trimTrailingWhitespace": true,
|
||||
"files.insertFinalNewline": true,
|
||||
"files.associations": {
|
||||
"*.props": "xml",
|
||||
"*.targets": "xml"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.IO.Pipelines;
|
||||
using System.Threading;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
||||
{
|
||||
public class IOQueue : PipeScheduler
|
||||
{
|
||||
private static readonly WaitCallback _doWorkCallback = s => ((IOQueue)s).DoWork();
|
||||
|
||||
private readonly object _workSync = new object();
|
||||
private readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
|
||||
private bool _doingWork;
|
||||
|
||||
public override void Schedule<T>(Action<T> action, T state)
|
||||
{
|
||||
var work = new Work
|
||||
{
|
||||
CallbackAdapter = (c, s) => ((Action<T>)c)((T)s),
|
||||
Callback = action,
|
||||
State = state
|
||||
};
|
||||
|
||||
_workItems.Enqueue(work);
|
||||
|
||||
lock (_workSync)
|
||||
{
|
||||
if (!_doingWork)
|
||||
{
|
||||
System.Threading.ThreadPool.QueueUserWorkItem(_doWorkCallback, this);
|
||||
_doingWork = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void DoWork()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
while (_workItems.TryDequeue(out Work item))
|
||||
{
|
||||
item.CallbackAdapter(item.Callback, item.State);
|
||||
}
|
||||
|
||||
lock (_workSync)
|
||||
{
|
||||
if (_workItems.IsEmpty)
|
||||
{
|
||||
_doingWork = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private struct Work
|
||||
{
|
||||
public Action<object, object> CallbackAdapter;
|
||||
public object Callback;
|
||||
public object State;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
|
|
@ -13,10 +14,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
{
|
||||
private static readonly Action _callbackCompleted = () => { };
|
||||
|
||||
private readonly PipeScheduler _ioScheduler;
|
||||
|
||||
private Action _callback;
|
||||
private int _bytesTransfered;
|
||||
private int _bytesTransferred;
|
||||
private SocketError _error;
|
||||
|
||||
public SocketAwaitable(PipeScheduler ioScheduler)
|
||||
{
|
||||
_ioScheduler = ioScheduler;
|
||||
}
|
||||
|
||||
public SocketAwaitable GetAwaiter() => this;
|
||||
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
|
||||
|
||||
|
|
@ -31,7 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
throw new SocketException((int)_error);
|
||||
}
|
||||
|
||||
return _bytesTransfered;
|
||||
return _bytesTransferred;
|
||||
}
|
||||
|
||||
public void OnCompleted(Action continuation)
|
||||
|
|
@ -51,8 +59,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
public void Complete(int bytesTransferred, SocketError socketError)
|
||||
{
|
||||
_error = socketError;
|
||||
_bytesTransfered = bytesTransferred;
|
||||
Interlocked.Exchange(ref _callback, _callbackCompleted)?.Invoke();
|
||||
_bytesTransferred = bytesTransferred;
|
||||
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
|
||||
|
||||
if (continuation != null)
|
||||
{
|
||||
_ioScheduler.Schedule(c => c(), continuation);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ using System.IO;
|
|||
using System.IO.Pipelines;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Protocols;
|
||||
using System.Threading;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
|
|
@ -19,15 +19,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
internal sealed class SocketConnection : TransportConnection
|
||||
{
|
||||
private const int MinAllocBufferSize = 2048;
|
||||
public static bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
|
||||
|
||||
private readonly Socket _socket;
|
||||
private readonly PipeScheduler _scheduler;
|
||||
private readonly ISocketsTrace _trace;
|
||||
private readonly SocketReceiver _receiver;
|
||||
private readonly SocketSender _sender;
|
||||
|
||||
private volatile bool _aborted;
|
||||
|
||||
internal SocketConnection(Socket socket, MemoryPool<byte> memoryPool, ISocketsTrace trace)
|
||||
internal SocketConnection(Socket socket, MemoryPool<byte> memoryPool, PipeScheduler scheduler, ISocketsTrace trace)
|
||||
{
|
||||
Debug.Assert(socket != null);
|
||||
Debug.Assert(memoryPool != null);
|
||||
|
|
@ -35,6 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
|
||||
_socket = socket;
|
||||
MemoryPool = memoryPool;
|
||||
_scheduler = scheduler;
|
||||
_trace = trace;
|
||||
|
||||
var localEndPoint = (IPEndPoint)_socket.LocalEndPoint;
|
||||
|
|
@ -46,13 +49,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
RemoteAddress = remoteEndPoint.Address;
|
||||
RemotePort = remoteEndPoint.Port;
|
||||
|
||||
_receiver = new SocketReceiver(_socket);
|
||||
_sender = new SocketSender(_socket);
|
||||
// On *nix platforms, Sockets already dispatches to the ThreadPool.
|
||||
var awaiterScheduler = IsWindows ? _scheduler : PipeScheduler.Inline;
|
||||
|
||||
_receiver = new SocketReceiver(_socket, awaiterScheduler);
|
||||
_sender = new SocketSender(_socket, awaiterScheduler);
|
||||
}
|
||||
|
||||
public override MemoryPool<byte> MemoryPool { get; }
|
||||
public override PipeScheduler InputWriterScheduler => PipeScheduler.Inline;
|
||||
public override PipeScheduler OutputReaderScheduler => PipeScheduler.ThreadPool;
|
||||
public override PipeScheduler InputWriterScheduler => _scheduler;
|
||||
public override PipeScheduler OutputReaderScheduler => _scheduler;
|
||||
|
||||
public async Task StartAsync(IConnectionHandler connectionHandler)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
||||
|
|
@ -10,11 +11,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
{
|
||||
private readonly Socket _socket;
|
||||
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
|
||||
private readonly SocketAwaitable _awaitable = new SocketAwaitable();
|
||||
private readonly SocketAwaitable _awaitable;
|
||||
|
||||
public SocketReceiver(Socket socket)
|
||||
public SocketReceiver(Socket socket, PipeScheduler scheduler)
|
||||
{
|
||||
_socket = socket;
|
||||
_awaitable = new SocketAwaitable(scheduler);
|
||||
_eventArgs.UserToken = _awaitable;
|
||||
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Buffers;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.InteropServices;
|
||||
|
||||
|
|
@ -14,13 +15,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
{
|
||||
private readonly Socket _socket;
|
||||
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
|
||||
private readonly SocketAwaitable _awaitable = new SocketAwaitable();
|
||||
private readonly SocketAwaitable _awaitable;
|
||||
|
||||
private List<ArraySegment<byte>> _bufferList;
|
||||
|
||||
public SocketSender(Socket socket)
|
||||
public SocketSender(Socket socket, PipeScheduler scheduler)
|
||||
{
|
||||
_socket = socket;
|
||||
_awaitable = new SocketAwaitable(scheduler);
|
||||
_eventArgs.UserToken = _awaitable;
|
||||
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
using System;
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.ExceptionServices;
|
||||
|
|
@ -19,10 +20,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
{
|
||||
internal sealed class SocketTransport : ITransport
|
||||
{
|
||||
private static readonly PipeScheduler[] ThreadPoolSchedulerArray = new PipeScheduler[] { PipeScheduler.ThreadPool };
|
||||
|
||||
private readonly MemoryPool<byte> _memoryPool = KestrelMemoryPool.Create();
|
||||
private readonly IEndPointInformation _endPointInformation;
|
||||
private readonly IConnectionHandler _handler;
|
||||
private readonly IApplicationLifetime _appLifetime;
|
||||
private readonly int _numSchedulers;
|
||||
private readonly PipeScheduler[] _schedulers;
|
||||
private readonly ISocketsTrace _trace;
|
||||
private Socket _listenSocket;
|
||||
private Task _listenTask;
|
||||
|
|
@ -33,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
IEndPointInformation endPointInformation,
|
||||
IConnectionHandler handler,
|
||||
IApplicationLifetime applicationLifetime,
|
||||
int ioQueueCount,
|
||||
ISocketsTrace trace)
|
||||
{
|
||||
Debug.Assert(endPointInformation != null);
|
||||
|
|
@ -45,6 +51,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
_handler = handler;
|
||||
_appLifetime = applicationLifetime;
|
||||
_trace = trace;
|
||||
|
||||
if (ioQueueCount > 0)
|
||||
{
|
||||
_numSchedulers = ioQueueCount;
|
||||
_schedulers = new IOQueue[_numSchedulers];
|
||||
|
||||
for (var i = 0; i < _numSchedulers; i++)
|
||||
{
|
||||
_schedulers[i] = new IOQueue();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_numSchedulers = ThreadPoolSchedulerArray.Length;
|
||||
_schedulers = ThreadPoolSchedulerArray;
|
||||
}
|
||||
}
|
||||
|
||||
public Task BindAsync()
|
||||
|
|
@ -125,22 +147,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
{
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
for (var schedulerIndex = 0; schedulerIndex < _numSchedulers; schedulerIndex++)
|
||||
{
|
||||
var acceptSocket = await _listenSocket.AcceptAsync();
|
||||
acceptSocket.NoDelay = _endPointInformation.NoDelay;
|
||||
try
|
||||
{
|
||||
var acceptSocket = await _listenSocket.AcceptAsync();
|
||||
acceptSocket.NoDelay = _endPointInformation.NoDelay;
|
||||
|
||||
var connection = new SocketConnection(acceptSocket, _memoryPool, _trace);
|
||||
_ = connection.StartAsync(_handler);
|
||||
}
|
||||
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
|
||||
{
|
||||
// REVIEW: Should there be a seperate log message for a connection reset this early?
|
||||
_trace.ConnectionReset(connectionId: "(null)");
|
||||
}
|
||||
catch (SocketException ex) when (!_unbinding)
|
||||
{
|
||||
_trace.ConnectionError(connectionId: "(null)", ex);
|
||||
var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[schedulerIndex], _trace);
|
||||
_ = connection.StartAsync(_handler);
|
||||
}
|
||||
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
|
||||
{
|
||||
// REVIEW: Should there be a seperate log message for a connection reset this early?
|
||||
_trace.ConnectionReset(connectionId: "(null)");
|
||||
}
|
||||
catch (SocketException ex) when (!_unbinding)
|
||||
{
|
||||
_trace.ConnectionError(connectionId: "(null)", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,8 +12,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
{
|
||||
public sealed class SocketTransportFactory : ITransportFactory
|
||||
{
|
||||
private readonly SocketsTrace _trace;
|
||||
private readonly SocketTransportOptions _options;
|
||||
private readonly IApplicationLifetime _appLifetime;
|
||||
private readonly SocketsTrace _trace;
|
||||
|
||||
public SocketTransportFactory(
|
||||
IOptions<SocketTransportOptions> options,
|
||||
|
|
@ -33,6 +34,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
throw new ArgumentNullException(nameof(loggerFactory));
|
||||
}
|
||||
|
||||
_options = options.Value;
|
||||
_appLifetime = applicationLifetime;
|
||||
var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets");
|
||||
_trace = new SocketsTrace(logger);
|
||||
|
|
@ -55,7 +57,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
|||
throw new ArgumentNullException(nameof(handler));
|
||||
}
|
||||
|
||||
return new SocketTransport(endPointInformation, handler, _appLifetime, _trace);
|
||||
return new SocketTransport(endPointInformation, handler, _appLifetime, _options.IOQueueCount, _trace);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,18 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
|
||||
{
|
||||
// TODO: Come up with some options
|
||||
public class SocketTransportOptions
|
||||
{
|
||||
|
||||
/// <summary>
|
||||
/// The number of I/O queues used to process requests. Set to 0 to directly schedule I/O to the ThreadPool.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Defaults to <see cref="Environment.ProcessorCount" /> rounded down and clamped between 1 and 16.
|
||||
/// </remarks>
|
||||
public int IOQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue