diff --git a/.vscode/settings.json b/.vscode/settings.json index 2c3c5e92d1..0c60b84e84 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,7 +3,6 @@ "editor.tabSize": 2 }, "files.trimTrailingWhitespace": true, - "files.insertFinalNewline": true, "files.associations": { "*.props": "xml", "*.targets": "xml" diff --git a/src/Kestrel.Transport.Sockets/Internal/IOQueue.cs b/src/Kestrel.Transport.Sockets/Internal/IOQueue.cs new file mode 100644 index 0000000000..49b6b3e668 --- /dev/null +++ b/src/Kestrel.Transport.Sockets/Internal/IOQueue.cs @@ -0,0 +1,67 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.IO.Pipelines; +using System.Threading; + +namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal +{ + public class IOQueue : PipeScheduler + { + private static readonly WaitCallback _doWorkCallback = s => ((IOQueue)s).DoWork(); + + private readonly object _workSync = new object(); + private readonly ConcurrentQueue _workItems = new ConcurrentQueue(); + private bool _doingWork; + + public override void Schedule(Action action, T state) + { + var work = new Work + { + CallbackAdapter = (c, s) => ((Action)c)((T)s), + Callback = action, + State = state + }; + + _workItems.Enqueue(work); + + lock (_workSync) + { + if (!_doingWork) + { + System.Threading.ThreadPool.QueueUserWorkItem(_doWorkCallback, this); + _doingWork = true; + } + } + } + + private void DoWork() + { + while (true) + { + while (_workItems.TryDequeue(out Work item)) + { + item.CallbackAdapter(item.Callback, item.State); + } + + lock (_workSync) + { + if (_workItems.IsEmpty) + { + _doingWork = false; + return; + } + } + } + } + + private struct Work + { + public Action CallbackAdapter; + public object Callback; + public object State; + } + } +} diff --git a/src/Kestrel.Transport.Sockets/Internal/SocketAwaitable.cs b/src/Kestrel.Transport.Sockets/Internal/SocketAwaitable.cs index 0ed4ac89bc..f266033a36 100644 --- a/src/Kestrel.Transport.Sockets/Internal/SocketAwaitable.cs +++ b/src/Kestrel.Transport.Sockets/Internal/SocketAwaitable.cs @@ -3,6 +3,7 @@ using System; using System.Diagnostics; +using System.IO.Pipelines; using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Threading; @@ -13,10 +14,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { private static readonly Action _callbackCompleted = () => { }; + private readonly PipeScheduler _ioScheduler; + private Action _callback; - private int _bytesTransfered; + private int _bytesTransferred; private SocketError _error; + public SocketAwaitable(PipeScheduler ioScheduler) + { + _ioScheduler = ioScheduler; + } + public SocketAwaitable GetAwaiter() => this; public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); @@ -31,7 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal throw new SocketException((int)_error); } - return _bytesTransfered; + return _bytesTransferred; } public void OnCompleted(Action continuation) @@ -51,8 +59,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal public void Complete(int bytesTransferred, SocketError socketError) { _error = socketError; - _bytesTransfered = bytesTransferred; - Interlocked.Exchange(ref _callback, _callbackCompleted)?.Invoke(); + _bytesTransferred = bytesTransferred; + var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); + + if (continuation != null) + { + _ioScheduler.Schedule(c => c(), continuation); + } } } -} \ No newline at end of file +} diff --git a/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs b/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs index 87fee3379a..d932192165 100644 --- a/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs +++ b/src/Kestrel.Transport.Sockets/Internal/SocketConnection.cs @@ -8,9 +8,9 @@ using System.IO; using System.IO.Pipelines; using System.Net; using System.Net.Sockets; +using System.Runtime.InteropServices; using System.Threading.Tasks; using Microsoft.AspNetCore.Protocols; -using System.Threading; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; using Microsoft.Extensions.Logging; @@ -19,15 +19,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal internal sealed class SocketConnection : TransportConnection { private const int MinAllocBufferSize = 2048; + public static bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows); private readonly Socket _socket; + private readonly PipeScheduler _scheduler; private readonly ISocketsTrace _trace; private readonly SocketReceiver _receiver; private readonly SocketSender _sender; private volatile bool _aborted; - internal SocketConnection(Socket socket, MemoryPool memoryPool, ISocketsTrace trace) + internal SocketConnection(Socket socket, MemoryPool memoryPool, PipeScheduler scheduler, ISocketsTrace trace) { Debug.Assert(socket != null); Debug.Assert(memoryPool != null); @@ -35,6 +37,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal _socket = socket; MemoryPool = memoryPool; + _scheduler = scheduler; _trace = trace; var localEndPoint = (IPEndPoint)_socket.LocalEndPoint; @@ -46,13 +49,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal RemoteAddress = remoteEndPoint.Address; RemotePort = remoteEndPoint.Port; - _receiver = new SocketReceiver(_socket); - _sender = new SocketSender(_socket); + // On *nix platforms, Sockets already dispatches to the ThreadPool. + var awaiterScheduler = IsWindows ? _scheduler : PipeScheduler.Inline; + + _receiver = new SocketReceiver(_socket, awaiterScheduler); + _sender = new SocketSender(_socket, awaiterScheduler); } public override MemoryPool MemoryPool { get; } - public override PipeScheduler InputWriterScheduler => PipeScheduler.Inline; - public override PipeScheduler OutputReaderScheduler => PipeScheduler.ThreadPool; + public override PipeScheduler InputWriterScheduler => _scheduler; + public override PipeScheduler OutputReaderScheduler => _scheduler; public async Task StartAsync(IConnectionHandler connectionHandler) { diff --git a/src/Kestrel.Transport.Sockets/Internal/SocketReceiver.cs b/src/Kestrel.Transport.Sockets/Internal/SocketReceiver.cs index 0aff7d2cfd..150978e473 100644 --- a/src/Kestrel.Transport.Sockets/Internal/SocketReceiver.cs +++ b/src/Kestrel.Transport.Sockets/Internal/SocketReceiver.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.IO.Pipelines; using System.Net.Sockets; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal @@ -10,11 +11,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { private readonly Socket _socket; private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); - private readonly SocketAwaitable _awaitable = new SocketAwaitable(); + private readonly SocketAwaitable _awaitable; - public SocketReceiver(Socket socket) + public SocketReceiver(Socket socket, PipeScheduler scheduler) { _socket = socket; + _awaitable = new SocketAwaitable(scheduler); _eventArgs.UserToken = _awaitable; _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); } diff --git a/src/Kestrel.Transport.Sockets/Internal/SocketSender.cs b/src/Kestrel.Transport.Sockets/Internal/SocketSender.cs index 2199c60ef5..a8b0727c0a 100644 --- a/src/Kestrel.Transport.Sockets/Internal/SocketSender.cs +++ b/src/Kestrel.Transport.Sockets/Internal/SocketSender.cs @@ -5,6 +5,7 @@ using System; using System.Buffers; using System.Collections.Generic; using System.Diagnostics; +using System.IO.Pipelines; using System.Net.Sockets; using System.Runtime.InteropServices; @@ -14,13 +15,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal { private readonly Socket _socket; private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs(); - private readonly SocketAwaitable _awaitable = new SocketAwaitable(); + private readonly SocketAwaitable _awaitable; private List> _bufferList; - public SocketSender(Socket socket) + public SocketSender(Socket socket, PipeScheduler scheduler) { _socket = socket; + _awaitable = new SocketAwaitable(scheduler); _eventArgs.UserToken = _awaitable; _eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError); } diff --git a/src/Kestrel.Transport.Sockets/SocketTransport.cs b/src/Kestrel.Transport.Sockets/SocketTransport.cs index c27f24767a..9a11e03a0f 100644 --- a/src/Kestrel.Transport.Sockets/SocketTransport.cs +++ b/src/Kestrel.Transport.Sockets/SocketTransport.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; using System.Diagnostics; +using System.IO.Pipelines; using System.Net; using System.Net.Sockets; using System.Runtime.ExceptionServices; @@ -19,10 +20,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { internal sealed class SocketTransport : ITransport { + private static readonly PipeScheduler[] ThreadPoolSchedulerArray = new PipeScheduler[] { PipeScheduler.ThreadPool }; + private readonly MemoryPool _memoryPool = KestrelMemoryPool.Create(); private readonly IEndPointInformation _endPointInformation; private readonly IConnectionHandler _handler; private readonly IApplicationLifetime _appLifetime; + private readonly int _numSchedulers; + private readonly PipeScheduler[] _schedulers; private readonly ISocketsTrace _trace; private Socket _listenSocket; private Task _listenTask; @@ -33,6 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets IEndPointInformation endPointInformation, IConnectionHandler handler, IApplicationLifetime applicationLifetime, + int ioQueueCount, ISocketsTrace trace) { Debug.Assert(endPointInformation != null); @@ -45,6 +51,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets _handler = handler; _appLifetime = applicationLifetime; _trace = trace; + + if (ioQueueCount > 0) + { + _numSchedulers = ioQueueCount; + _schedulers = new IOQueue[_numSchedulers]; + + for (var i = 0; i < _numSchedulers; i++) + { + _schedulers[i] = new IOQueue(); + } + } + else + { + _numSchedulers = ThreadPoolSchedulerArray.Length; + _schedulers = ThreadPoolSchedulerArray; + } } public Task BindAsync() @@ -125,22 +147,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { while (true) { - try + for (var schedulerIndex = 0; schedulerIndex < _numSchedulers; schedulerIndex++) { - var acceptSocket = await _listenSocket.AcceptAsync(); - acceptSocket.NoDelay = _endPointInformation.NoDelay; + try + { + var acceptSocket = await _listenSocket.AcceptAsync(); + acceptSocket.NoDelay = _endPointInformation.NoDelay; - var connection = new SocketConnection(acceptSocket, _memoryPool, _trace); - _ = connection.StartAsync(_handler); - } - catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) - { - // REVIEW: Should there be a seperate log message for a connection reset this early? - _trace.ConnectionReset(connectionId: "(null)"); - } - catch (SocketException ex) when (!_unbinding) - { - _trace.ConnectionError(connectionId: "(null)", ex); + var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[schedulerIndex], _trace); + _ = connection.StartAsync(_handler); + } + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset) + { + // REVIEW: Should there be a seperate log message for a connection reset this early? + _trace.ConnectionReset(connectionId: "(null)"); + } + catch (SocketException ex) when (!_unbinding) + { + _trace.ConnectionError(connectionId: "(null)", ex); + } } } } diff --git a/src/Kestrel.Transport.Sockets/SocketTransportFactory.cs b/src/Kestrel.Transport.Sockets/SocketTransportFactory.cs index 80320da6f2..473a1519cc 100644 --- a/src/Kestrel.Transport.Sockets/SocketTransportFactory.cs +++ b/src/Kestrel.Transport.Sockets/SocketTransportFactory.cs @@ -12,8 +12,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { public sealed class SocketTransportFactory : ITransportFactory { - private readonly SocketsTrace _trace; + private readonly SocketTransportOptions _options; private readonly IApplicationLifetime _appLifetime; + private readonly SocketsTrace _trace; public SocketTransportFactory( IOptions options, @@ -33,6 +34,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets throw new ArgumentNullException(nameof(loggerFactory)); } + _options = options.Value; _appLifetime = applicationLifetime; var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets"); _trace = new SocketsTrace(logger); @@ -55,7 +57,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets throw new ArgumentNullException(nameof(handler)); } - return new SocketTransport(endPointInformation, handler, _appLifetime, _trace); + return new SocketTransport(endPointInformation, handler, _appLifetime, _options.IOQueueCount, _trace); } } } diff --git a/src/Kestrel.Transport.Sockets/SocketTransportOptions.cs b/src/Kestrel.Transport.Sockets/SocketTransportOptions.cs index 05bae64609..b6cec0a6d7 100644 --- a/src/Kestrel.Transport.Sockets/SocketTransportOptions.cs +++ b/src/Kestrel.Transport.Sockets/SocketTransportOptions.cs @@ -1,12 +1,18 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +using System; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { - // TODO: Come up with some options public class SocketTransportOptions { - + /// + /// The number of I/O queues used to process requests. Set to 0 to directly schedule I/O to the ThreadPool. + /// + /// + /// Defaults to rounded down and clamped between 1 and 16. + /// + public int IOQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16); } }