Improve the performance of connection filters
- Buffer connection filter input less by using ProducingStart/Complete - Simplify FilteredStreamAdapter
This commit is contained in:
parent
5aee524768
commit
9727a4db86
|
|
@ -6,18 +6,12 @@ using System.IO;
|
|||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
|
||||
{
|
||||
public class FilteredStreamAdapter : IDisposable
|
||||
{
|
||||
private readonly string _connectionId;
|
||||
private readonly Stream _filteredStream;
|
||||
private readonly IKestrelTrace _log;
|
||||
private readonly MemoryPool _memory;
|
||||
private MemoryPoolBlock _block;
|
||||
private bool _aborted = false;
|
||||
|
||||
public FilteredStreamAdapter(
|
||||
string connectionId,
|
||||
|
|
@ -30,72 +24,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Filter.Internal
|
|||
SocketInput = new SocketInput(memory, threadPool, bufferSizeControl);
|
||||
SocketOutput = new StreamSocketOutput(connectionId, filteredStream, memory, logger);
|
||||
|
||||
_connectionId = connectionId;
|
||||
_log = logger;
|
||||
_filteredStream = filteredStream;
|
||||
_memory = memory;
|
||||
}
|
||||
|
||||
public SocketInput SocketInput { get; private set; }
|
||||
public SocketInput SocketInput { get; }
|
||||
|
||||
public ISocketOutput SocketOutput { get; private set; }
|
||||
|
||||
public Task ReadInputAsync()
|
||||
{
|
||||
_block = _memory.Lease();
|
||||
// Use pooled block for copy
|
||||
return FilterInputAsync(_block).ContinueWith((task, state) =>
|
||||
{
|
||||
((FilteredStreamAdapter)state).OnStreamClose(task);
|
||||
}, this);
|
||||
}
|
||||
|
||||
public void Abort()
|
||||
{
|
||||
_aborted = true;
|
||||
}
|
||||
public ISocketOutput SocketOutput { get; }
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
SocketInput.Dispose();
|
||||
}
|
||||
|
||||
private async Task FilterInputAsync(MemoryPoolBlock block)
|
||||
public async Task ReadInputAsync()
|
||||
{
|
||||
int bytesRead;
|
||||
while ((bytesRead = await _filteredStream.ReadAsync(block.Array, block.Data.Offset, block.Data.Count)) != 0)
|
||||
{
|
||||
SocketInput.IncomingData(block.Array, block.Data.Offset, bytesRead);
|
||||
}
|
||||
}
|
||||
|
||||
private void OnStreamClose(Task copyAsyncTask)
|
||||
{
|
||||
_memory.Return(_block);
|
||||
do
|
||||
{
|
||||
var block = SocketInput.IncomingStart();
|
||||
|
||||
if (copyAsyncTask.IsFaulted)
|
||||
{
|
||||
SocketInput.AbortAwaiting();
|
||||
_log.LogError(0, copyAsyncTask.Exception, "FilteredStreamAdapter.CopyToAsync");
|
||||
}
|
||||
else if (copyAsyncTask.IsCanceled)
|
||||
{
|
||||
SocketInput.AbortAwaiting();
|
||||
_log.LogError("FilteredStreamAdapter.CopyToAsync canceled.");
|
||||
}
|
||||
else if (_aborted)
|
||||
{
|
||||
SocketInput.AbortAwaiting();
|
||||
}
|
||||
try
|
||||
{
|
||||
var count = block.Data.Offset + block.Data.Count - block.End;
|
||||
bytesRead = await _filteredStream.ReadAsync(block.Array, block.End, count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
SocketInput.IncomingComplete(0, ex);
|
||||
throw;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
SocketInput.IncomingFin();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.LogError(0, ex, "FilteredStreamAdapter.OnStreamClose");
|
||||
}
|
||||
SocketInput.IncomingComplete(bytesRead, error: null);
|
||||
} while (bytesRead != 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -144,21 +144,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
{
|
||||
if (_filteredStreamAdapter != null)
|
||||
{
|
||||
_filteredStreamAdapter.Abort();
|
||||
SocketInput.IncomingFin();
|
||||
_readInputTask.ContinueWith((task, state) =>
|
||||
{
|
||||
var connection = (Connection)state;
|
||||
connection._filterContext.Connection.Dispose();
|
||||
connection._filteredStreamAdapter.Dispose();
|
||||
connection.SocketInput.Dispose();
|
||||
}, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
SocketInput.Dispose();
|
||||
}
|
||||
|
||||
SocketInput.Dispose();
|
||||
_socketClosedTcs.TrySetResult(null);
|
||||
}
|
||||
|
||||
|
|
@ -176,9 +170,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
|
||||
_frame.PrepareRequest = _filterContext.PrepareRequest;
|
||||
|
||||
// Reset needs to be called here so prepare request gets applied
|
||||
_frame.Reset();
|
||||
|
||||
_frame.Start();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -72,7 +72,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
_pathBase = context.ServerAddress.PathBase;
|
||||
|
||||
FrameControl = this;
|
||||
Reset();
|
||||
}
|
||||
|
||||
public string ConnectionIdFeature { get; set; }
|
||||
|
|
@ -313,6 +312,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
/// </summary>
|
||||
public void Start()
|
||||
{
|
||||
Reset();
|
||||
_requestProcessingTask =
|
||||
Task.Factory.StartNew(
|
||||
(o) => ((Frame)o).RequestProcessingAsync(),
|
||||
|
|
|
|||
|
|
@ -67,39 +67,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
return _pinned;
|
||||
}
|
||||
|
||||
public void IncomingData(byte[] buffer, int offset, int count)
|
||||
{
|
||||
lock (_sync)
|
||||
{
|
||||
// Must call Add() before bytes are available to consumer, to ensure that Length is >= 0
|
||||
_bufferSizeControl?.Add(count);
|
||||
|
||||
if (count > 0)
|
||||
{
|
||||
if (_tail == null)
|
||||
{
|
||||
_tail = _memory.Lease();
|
||||
}
|
||||
|
||||
var iterator = new MemoryPoolIterator(_tail, _tail.End);
|
||||
iterator.CopyFrom(buffer, offset, count);
|
||||
|
||||
if (_head == null)
|
||||
{
|
||||
_head = _tail;
|
||||
}
|
||||
|
||||
_tail = iterator.Block;
|
||||
}
|
||||
else
|
||||
{
|
||||
FinReceived();
|
||||
}
|
||||
|
||||
Complete();
|
||||
}
|
||||
}
|
||||
|
||||
public void IncomingComplete(int count, Exception error)
|
||||
{
|
||||
lock (_sync)
|
||||
|
|
@ -156,12 +123,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
|
|||
}
|
||||
}
|
||||
|
||||
public void IncomingFin()
|
||||
{
|
||||
// Force a FIN
|
||||
IncomingData(null, 0, 0);
|
||||
}
|
||||
|
||||
private void Complete()
|
||||
{
|
||||
var awaitableState = Interlocked.Exchange(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
|
||||
{
|
||||
public static class SocketInputExtensions
|
||||
{
|
||||
public static void IncomingData(this SocketInput input, byte[] buffer, int offset, int count)
|
||||
{
|
||||
var bufferIndex = offset;
|
||||
var remaining = count;
|
||||
|
||||
while (remaining > 0)
|
||||
{
|
||||
var block = input.IncomingStart();
|
||||
|
||||
var bytesLeftInBlock = block.Data.Offset + block.Data.Count - block.End;
|
||||
var bytesToCopy = remaining < bytesLeftInBlock ? remaining : bytesLeftInBlock;
|
||||
|
||||
Buffer.BlockCopy(buffer, bufferIndex, block.Array, block.End, bytesToCopy);
|
||||
|
||||
bufferIndex += bytesToCopy;
|
||||
remaining -= bytesToCopy;
|
||||
|
||||
input.IncomingComplete(bytesToCopy, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static void IncomingFin(this SocketInput input)
|
||||
{
|
||||
input.IncomingComplete(0, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ using Microsoft.AspNetCore.Server.Kestrel;
|
|||
using Microsoft.AspNetCore.Server.Kestrel.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.KestrelTests.TestHelpers;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.KestrelTests
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue