From 9727a4db865780904f88b18a2636826dd45671d1 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Thu, 28 Jul 2016 16:34:12 -0700 Subject: [PATCH] Improve the performance of connection filters - Buffer connection filter input less by using ProducingStart/Complete - Simplify FilteredStreamAdapter --- .../Filter/Internal/FilteredStreamAdapter.cs | 75 +++++-------------- .../Internal/Http/Connection.cs | 11 +-- .../Internal/Http/Frame.cs | 2 +- .../Internal/Http/SocketInput.cs | 39 ---------- .../TestHelpers/SocketInputExtensions.cs | 37 +++++++++ .../TestInput.cs | 1 + 6 files changed, 58 insertions(+), 107 deletions(-) create mode 100644 test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/SocketInputExtensions.cs diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/FilteredStreamAdapter.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/FilteredStreamAdapter.cs index e17a988b45..4f4a0f9405 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Filter/Internal/FilteredStreamAdapter.cs @@ -6,18 +6,12 @@ using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure; -using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal { public class FilteredStreamAdapter : IDisposable { - private readonly string _connectionId; private readonly Stream _filteredStream; - private readonly IKestrelTrace _log; - private readonly MemoryPool _memory; - private MemoryPoolBlock _block; - private bool _aborted = false; public FilteredStreamAdapter( string connectionId, @@ -30,72 +24,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal SocketInput = new SocketInput(memory, threadPool, bufferSizeControl); SocketOutput = new StreamSocketOutput(connectionId, filteredStream, memory, logger); - _connectionId = connectionId; - _log = logger; _filteredStream = filteredStream; - _memory = memory; } - public SocketInput SocketInput { get; private set; } + public SocketInput SocketInput { get; } - public ISocketOutput SocketOutput { get; private set; } - - public Task ReadInputAsync() - { - _block = _memory.Lease(); - // Use pooled block for copy - return FilterInputAsync(_block).ContinueWith((task, state) => - { - ((FilteredStreamAdapter)state).OnStreamClose(task); - }, this); - } - - public void Abort() - { - _aborted = true; - } + public ISocketOutput SocketOutput { get; } public void Dispose() { SocketInput.Dispose(); } - private async Task FilterInputAsync(MemoryPoolBlock block) + public async Task ReadInputAsync() { int bytesRead; - while ((bytesRead = await _filteredStream.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0) - { - SocketInput.IncomingData(block.Array, block.Data.Offset, bytesRead); - } - } - private void OnStreamClose(Task copyAsyncTask) - { - _memory.Return(_block); + do + { + var block = SocketInput.IncomingStart(); - if (copyAsyncTask.IsFaulted) - { - SocketInput.AbortAwaiting(); - _log.LogError(0, copyAsyncTask.Exception, "FilteredStreamAdapter.CopyToAsync"); - } - else if (copyAsyncTask.IsCanceled) - { - SocketInput.AbortAwaiting(); - _log.LogError("FilteredStreamAdapter.CopyToAsync canceled."); - } - else if (_aborted) - { - SocketInput.AbortAwaiting(); - } + try + { + var count = block.Data.Offset + block.Data.Count - block.End; + bytesRead = await _filteredStream.ReadAsync(block.Array, block.End, count); + } + catch (Exception ex) + { + SocketInput.IncomingComplete(0, ex); + throw; + } - try - { - SocketInput.IncomingFin(); - } - catch (Exception ex) - { - _log.LogError(0, ex, "FilteredStreamAdapter.OnStreamClose"); - } + SocketInput.IncomingComplete(bytesRead, error: null); + } while (bytesRead != 0); } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Connection.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Connection.cs index 09a8c73d15..53de8bb857 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Connection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Connection.cs @@ -144,21 +144,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http { if (_filteredStreamAdapter != null) { - _filteredStreamAdapter.Abort(); - SocketInput.IncomingFin(); _readInputTask.ContinueWith((task, state) => { var connection = (Connection)state; connection._filterContext.Connection.Dispose(); connection._filteredStreamAdapter.Dispose(); - connection.SocketInput.Dispose(); }, this); } - else - { - SocketInput.Dispose(); - } + SocketInput.Dispose(); _socketClosedTcs.TrySetResult(null); } @@ -176,9 +170,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http _frame.PrepareRequest = _filterContext.PrepareRequest; - // Reset needs to be called here so prepare request gets applied - _frame.Reset(); - _frame.Start(); } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs index ad46367135..447a437ca3 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs @@ -72,7 +72,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http _pathBase = context.ServerAddress.PathBase; FrameControl = this; - Reset(); } public string ConnectionIdFeature { get; set; } @@ -313,6 +312,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http /// public void Start() { + Reset(); _requestProcessingTask = Task.Factory.StartNew( (o) => ((Frame)o).RequestProcessingAsync(), diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs index 9c7f43031e..61f477ea4a 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs @@ -67,39 +67,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http return _pinned; } - public void IncomingData(byte[] buffer, int offset, int count) - { - lock (_sync) - { - // Must call Add() before bytes are available to consumer, to ensure that Length is >= 0 - _bufferSizeControl?.Add(count); - - if (count > 0) - { - if (_tail == null) - { - _tail = _memory.Lease(); - } - - var iterator = new MemoryPoolIterator(_tail, _tail.End); - iterator.CopyFrom(buffer, offset, count); - - if (_head == null) - { - _head = _tail; - } - - _tail = iterator.Block; - } - else - { - FinReceived(); - } - - Complete(); - } - } - public void IncomingComplete(int count, Exception error) { lock (_sync) @@ -156,12 +123,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http } } - public void IncomingFin() - { - // Force a FIN - IncomingData(null, 0, 0); - } - private void Complete() { var awaitableState = Interlocked.Exchange( diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/SocketInputExtensions.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/SocketInputExtensions.cs new file mode 100644 index 0000000000..998e0552c5 --- /dev/null +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/TestHelpers/SocketInputExtensions.cs @@ -0,0 +1,37 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using Microsoft.AspNetCore.Server.Kestrel.Internal.Http; + +namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers +{ + public static class SocketInputExtensions + { + public static void IncomingData(this SocketInput input, byte[] buffer, int offset, int count) + { + var bufferIndex = offset; + var remaining = count; + + while (remaining > 0) + { + var block = input.IncomingStart(); + + var bytesLeftInBlock = block.Data.Offset + block.Data.Count - block.End; + var bytesToCopy = remaining < bytesLeftInBlock ? remaining : bytesLeftInBlock; + + Buffer.BlockCopy(buffer, bufferIndex, block.Array, block.End, bytesToCopy); + + bufferIndex += bytesToCopy; + remaining -= bytesToCopy; + + input.IncomingComplete(bytesToCopy, null); + } + } + + public static void IncomingFin(this SocketInput input) + { + input.IncomingComplete(0, null); + } + } +} diff --git a/test/Microsoft.AspNetCore.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNetCore.Server.KestrelTests/TestInput.cs index 08106ed1c0..955cb088b9 100644 --- a/test/Microsoft.AspNetCore.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNetCore.Server.KestrelTests/TestInput.cs @@ -8,6 +8,7 @@ using Microsoft.AspNetCore.Server.Kestrel; using Microsoft.AspNetCore.Server.Kestrel.Internal; using Microsoft.AspNetCore.Server.Kestrel.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.KestrelTests.TestHelpers; namespace Microsoft.AspNetCore.Server.KestrelTests {