Make Kestrel's response buffer limit configurable

This commit is contained in:
Stephen Halter 2016-08-24 17:09:34 -07:00
parent acfcafb6e1
commit 1ecef8094a
5 changed files with 274 additions and 60 deletions

View File

@ -15,7 +15,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
public class SocketOutput : ISocketOutput
{
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
// Well behaved WriteAsync users should await returned task, so there is no need to allocate more per connection by default
private const int _initialTaskQueues = 1;
private const int _maxPooledWriteContexts = 32;
@ -26,6 +25,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private readonly KestrelThread _thread;
private readonly UvStreamHandle _socket;
private readonly Connection _connection;
private readonly long? _maxBytesPreCompleted;
private readonly string _connectionId;
private readonly IKestrelTrace _log;
private readonly IThreadPool _threadPool;
@ -51,7 +51,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private bool _postingWrite = false;
private bool _cancelled = false;
private int _numBytesPreCompleted = 0;
private long _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<WaitingTask> _tasksPending;
@ -75,6 +75,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_tasksPending = new Queue<WaitingTask>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
_writeReqPool = thread.WriteReqPool;
_maxBytesPreCompleted = connection.ServerOptions.Limits.MaxResponseBufferSize;
_head = thread.Memory.Lease();
_tail = _head;
@ -146,9 +147,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
_nextWriteContext.SocketDisconnect = true;
}
if (_lastWriteError == null &&
_tasksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted)
if (!_maxBytesPreCompleted.HasValue || _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted.Value)
{
// Complete the write task immediately if all previous write tasks have been completed,
// the buffers haven't grown too large, and the last write to the socket succeeded.
@ -403,7 +402,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
}
}
private void CompleteNextWrite(ref int bytesLeftToBuffer)
private void CompleteNextWrite(ref long bytesLeftToBuffer)
{
// Called inside _contextLock
var waitingTask = _tasksPending.Dequeue();
@ -441,10 +440,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private void CompleteFinishedWrites(int status)
{
if (!_maxBytesPreCompleted.HasValue)
{
Debug.Assert(_tasksPending.Count == 0);
return;
}
// Called inside _contextLock
// bytesLeftToBuffer can be greater than _maxBytesPreCompleted
// This allows large writes to complete once they've actually finished.
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
var bytesLeftToBuffer = _maxBytesPreCompleted.Value - _numBytesPreCompleted;
while (_tasksPending.Count > 0 &&
(_tasksPending.Peek().BytesToWrite) <= bytesLeftToBuffer)
{
@ -454,8 +459,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Internal.Http
private void CompleteAllWrites()
{
if (!_maxBytesPreCompleted.HasValue)
{
Debug.Assert(_tasksPending.Count == 0);
return;
}
// Called inside _contextLock
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
var bytesLeftToBuffer = _maxBytesPreCompleted.Value - _numBytesPreCompleted;
while (_tasksPending.Count > 0)
{
CompleteNextWrite(ref bytesLeftToBuffer);

View File

@ -7,6 +7,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel
{
public class KestrelServerLimits
{
// Matches the non-configurable default response buffer size for Kestrel in 1.0.0
private long? _maxResponseBufferSize = 64 * 1024;
// Matches the default client_max_body_size in nginx. Also large enough that most requests
// should be under the limit.
private long? _maxRequestBufferSize = 1024 * 1024;
@ -20,6 +23,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel
// Matches the default LimitRequestFields in Apache httpd.
private int _maxRequestHeaderCount = 100;
/// <summary>
/// Gets or sets the maximum size of the response buffer before write
/// calls begin to block or return tasks that don't complete until the
/// buffer size drops below the configured limit.
/// </summary>
/// <remarks>
/// When set to null, the size of the response buffer is unlimited.
/// When set to zero, all write calls will block or return tasks that
/// don't complete until the entire response buffer is flushed.
/// Defaults to 65,536 bytes (64 KB).
/// </remarks>
public long? MaxResponseBufferSize
{
get
{
return _maxResponseBufferSize;
}
set
{
if (value.HasValue && value.Value < 0)
{
throw new ArgumentOutOfRangeException(nameof(value), "Value must be null or a non-negative integer.");
}
_maxResponseBufferSize = value;
}
}
/// <summary>
/// Gets or sets the maximum size of the request buffer.
/// </summary>

View File

@ -9,6 +9,35 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
public class KestrelServerLimitsTests
{
[Fact]
public void MaxResponseBufferSizeDefault()
{
Assert.Equal(64 * 1024, (new KestrelServerLimits()).MaxResponseBufferSize);
}
[Theory]
[InlineData((long)-1)]
[InlineData(long.MinValue)]
public void MaxResponseBufferSizeInvalid(long value)
{
Assert.Throws<ArgumentOutOfRangeException>(() =>
{
(new KestrelServerLimits()).MaxResponseBufferSize = value;
});
}
[Theory]
[InlineData(null)]
[InlineData((long)0)]
[InlineData((long)1)]
[InlineData(long.MaxValue)]
public void MaxResponseBufferSizeValid(long? value)
{
var o = new KestrelServerLimits();
o.MaxResponseBufferSize = value;
Assert.Equal(value, o.MaxResponseBufferSize);
}
[Fact]
public void MaxRequestBufferSizeDefault()
{

View File

@ -5,6 +5,7 @@ using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel;
using Microsoft.AspNetCore.Server.Kestrel.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
@ -15,8 +16,43 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
{
public class SocketOutputTests
{
[Fact]
public async Task CanWrite1MB()
public static TheoryData<KestrelServerOptions> MaxResponseBufferSizeData => new TheoryData<KestrelServerOptions>
{
new KestrelServerOptions(),
new KestrelServerOptions
{
Limits = { MaxResponseBufferSize = 0 }
},
new KestrelServerOptions
{
Limits = { MaxResponseBufferSize = 1024 }
},
new KestrelServerOptions
{
Limits = { MaxResponseBufferSize = 1024 * 1024 }
},
new KestrelServerOptions
{
Limits = { MaxResponseBufferSize = null }
},
};
public static TheoryData<KestrelServerOptions> PositiveMaxResponseBufferSizeData => new TheoryData<KestrelServerOptions>
{
new KestrelServerOptions(),
new KestrelServerOptions
{
Limits = { MaxResponseBufferSize = 1024 }
},
new KestrelServerOptions
{
Limits = { MaxResponseBufferSize = 1024 * 1024 }
}
};
[Theory]
[MemberData(nameof(MaxResponseBufferSizeData))]
public async Task CanWrite1MB(KestrelServerOptions options)
{
// This test was added because when initially implementing write-behind buffering in
// SocketOutput, the write callback would never be invoked for writes larger than
@ -33,10 +69,10 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp);
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
var bufferSize = 1048576;
// At least one run of this test should have a MaxResponseBufferSize < 1 MB.
var bufferSize = 1024 * 1024;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
// Act
@ -53,10 +89,8 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
[Fact]
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyBuffered()
public async Task NullMaxResponseBufferSizeAllowsUnlimitedBuffer()
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
@ -78,7 +112,121 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var mockConnection = new MockConnection();
var options = new KestrelServerOptions { Limits = { MaxResponseBufferSize = null } };
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp);
// Don't want to allocate anything too huge for perf. This is at least larger than the default buffer.
var bufferSize = 1024 * 1024;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
// Act
var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken));
// Assert
Assert.Equal(TaskStatus.RanToCompletion, writeTask.Status);
// Cleanup
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
await mockLibuv.OnPostTask;
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
}
}
}
[Fact]
public async Task ZeroMaxResponseBufferSizeDisablesBuffering()
{
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
var mockLibuv = new MockLibuv
{
OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
}
};
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1);
kestrelEngine.Threads.Add(kestrelThread);
await kestrelThread.StartAsync();
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var options = new KestrelServerOptions { Limits = { MaxResponseBufferSize = 0 } };
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp);
var bufferSize = 1;
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
// Act
var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken));
// Assert
Assert.False(writeTask.IsCompleted);
// Act
await mockLibuv.OnPostTask;
// Finishing the write should allow the task to complete.
Action<int> triggerNextCompleted;
Assert.True(completeQueue.TryDequeue(out triggerNextCompleted));
triggerNextCompleted(0);
// Assert
Assert.Equal(TaskStatus.RanToCompletion, writeTask.Status);
// Cleanup
var cleanupTask = socketOutput.WriteAsync(
default(ArraySegment<byte>), default(CancellationToken), socketDisconnect: true);
// Wait for all writes to complete so the completeQueue isn't modified during enumeration.
await mockLibuv.OnPostTask;
foreach (var triggerCompleted in completeQueue)
{
triggerCompleted(0);
}
}
}
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyBuffered(KestrelServerOptions options)
{
var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value;
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
var mockLibuv = new MockLibuv
{
OnWrite = (socket, buffers, triggerCompleted) =>
{
completeQueue.Enqueue(triggerCompleted);
return 0;
}
};
using (var kestrelEngine = new KestrelEngine(mockLibuv, new TestServiceContext()))
{
var kestrelThread = new KestrelThread(kestrelEngine, maxLoops: 1);
kestrelEngine.Threads.Add(kestrelThread);
await kestrelThread.StartAsync();
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var mockConnection = new MockConnection(options);
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted;
@ -122,11 +270,11 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyBuffered()
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAreAlreadyBuffered(KestrelServerOptions options)
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value;
var completeQueue = new ConcurrentQueue<Action<int>>();
var writeRequested = false;
@ -150,7 +298,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var mockConnection = new MockConnection();
var mockConnection = new MockConnection(options);
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted / 2;
@ -207,11 +355,11 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task OnlyWritesRequestingCancellationAreErroredOnCancellation()
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task OnlyWritesRequestingCancellationAreErroredOnCancellation(KestrelServerOptions options)
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value;
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
@ -234,7 +382,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
using (var mockConnection = new MockConnection())
using (var mockConnection = new MockConnection(options))
{
ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
@ -324,11 +472,11 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task FailedWriteCompletesOrCancelsAllPendingTasks()
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task FailedWriteCompletesOrCancelsAllPendingTasks(KestrelServerOptions options)
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value;
var completeQueue = new ConcurrentQueue<Action<int>>();
// Arrange
@ -351,7 +499,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
using (var mockConnection = new MockConnection())
using (var mockConnection = new MockConnection(options))
{
var abortedSource = mockConnection.RequestAbortedSource;
ISocketOutput socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
@ -414,11 +562,11 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task WritesDontGetCompletedTooQuickly()
[Theory]
[MemberData(nameof(PositiveMaxResponseBufferSizeData))]
public async Task WritesDontGetCompletedTooQuickly(KestrelServerOptions options)
{
// This should match _maxBytesPreCompleted in SocketOutput
var maxBytesPreCompleted = 65536;
var maxBytesPreCompleted = (int)options.Limits.MaxResponseBufferSize.Value;
var completeQueue = new ConcurrentQueue<Action<int>>();
var writeCalled = false;
@ -443,7 +591,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var mockConnection = new MockConnection();
var mockConnection = new MockConnection(options);
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var bufferSize = maxBytesPreCompleted;
@ -498,8 +646,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task ProducingStartAndProducingCompleteCanBeUsedDirectly()
[Theory]
[MemberData(nameof(MaxResponseBufferSizeData))]
public async Task ProducingStartAndProducingCompleteCanBeUsedDirectly(KestrelServerOptions options)
{
int nBuffers = 0;
@ -522,7 +671,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(options), "0", trace, ltp);
// block 1
var start = socketOutput.ProducingStart();
@ -549,8 +698,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task OnlyAllowsUpToThreeConcurrentWrites()
[Theory]
[MemberData(nameof(MaxResponseBufferSizeData))]
public async Task OnlyAllowsUpToThreeConcurrentWrites(KestrelServerOptions options)
{
var writeCalled = false;
var completeQueue = new ConcurrentQueue<Action<int>>();
@ -574,7 +724,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var mockConnection = new MockConnection();
var mockConnection = new MockConnection(options);
var socketOutput = new SocketOutput(kestrelThread, socket, mockConnection, "0", trace, ltp);
var buffer = new ArraySegment<byte>(new byte[1]);
@ -619,8 +769,9 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
}
}
[Fact]
public async Task WritesAreAggregated()
[Theory]
[MemberData(nameof(MaxResponseBufferSizeData))]
public async Task WritesAreAggregated(KestrelServerOptions options)
{
var writeCalled = false;
var writeCount = 0;
@ -645,7 +796,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(), "0", trace, ltp);
var socketOutput = new SocketOutput(kestrelThread, socket, new MockConnection(new KestrelServerOptions()), "0", trace, ltp);
mockLibuv.KestrelThreadBlocker.Reset();
@ -691,7 +842,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests
var socket = new MockSocket(mockLibuv, kestrelThread.Loop.ThreadId, new TestKestrelTrace());
var trace = new KestrelTrace(new TestKestrelTrace());
var ltp = new SynchronousThreadPool();
var connection = new MockConnection();
var connection = new MockConnection(new KestrelServerOptions());
var socketOutput = new SocketOutput(kestrelThread, socket, connection, "0", trace, ltp);
// Close SocketOutput

View File

@ -4,25 +4,24 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel;
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
{
public class MockConnection : Connection, IDisposable
{
private TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>();
private readonly TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>();
public MockConnection()
public MockConnection(KestrelServerOptions options)
{
RequestAbortedSource = new CancellationTokenSource();
ServerOptions = options;
}
public override void Abort(Exception error = null)
{
if (RequestAbortedSource != null)
{
RequestAbortedSource.Cancel();
}
RequestAbortedSource?.Cancel();
}
public override void OnSocketClosed()
@ -32,13 +31,7 @@ namespace Microsoft.AspNetCore.Server.KestrelTests.TestHelpers
public CancellationTokenSource RequestAbortedSource { get; }
public Task SocketClosed
{
get
{
return _socketClosedTcs.Task;
}
}
public Task SocketClosed => _socketClosedTcs.Task;
public void Dispose()
{