Merge branch 'release/2.1' into dev
This commit is contained in:
commit
b040e33b1b
|
|
@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
internal sealed class SocketConnection : TransportConnection
|
||||
{
|
||||
private static readonly int MinAllocBufferSize = KestrelMemoryPool.MinimumSegmentSize / 2;
|
||||
private static readonly bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
|
||||
|
||||
private readonly Socket _socket;
|
||||
private readonly PipeScheduler _scheduler;
|
||||
|
|
@ -55,11 +54,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
|
||||
ConnectionClosed = _connectionClosedTokenSource.Token;
|
||||
|
||||
// On *nix platforms, Sockets already dispatches to the ThreadPool.
|
||||
var awaiterScheduler = IsWindows ? _scheduler : PipeScheduler.Inline;
|
||||
|
||||
_receiver = new SocketReceiver(_socket, awaiterScheduler);
|
||||
_sender = new SocketSender(_socket, awaiterScheduler);
|
||||
_receiver = new SocketReceiver(_socket, _scheduler);
|
||||
_sender = new SocketSender(_socket, _scheduler);
|
||||
}
|
||||
|
||||
public override MemoryPool<byte> MemoryPool { get; }
|
||||
|
|
@ -69,28 +65,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
|
||||
public async Task StartAsync()
|
||||
{
|
||||
Exception sendError = null;
|
||||
try
|
||||
{
|
||||
// Spawn send and receive logic
|
||||
Task receiveTask = DoReceive();
|
||||
Task<Exception> sendTask = DoSend();
|
||||
|
||||
// If the sending task completes then close the receive
|
||||
// We don't need to do this in the other direction because the kestrel
|
||||
// will trigger the output closing once the input is complete.
|
||||
if (await Task.WhenAny(receiveTask, sendTask) == sendTask)
|
||||
{
|
||||
// Tell the reader it's being aborted
|
||||
_socket.Dispose();
|
||||
}
|
||||
var receiveTask = DoReceive();
|
||||
var sendTask = DoSend();
|
||||
|
||||
// Now wait for both to complete
|
||||
await receiveTask;
|
||||
sendError = await sendTask;
|
||||
await sendTask;
|
||||
|
||||
// Dispose the socket(should noop if already called)
|
||||
_socket.Dispose();
|
||||
_receiver.Dispose();
|
||||
_sender.Dispose();
|
||||
ThreadPool.QueueUserWorkItem(state => ((SocketConnection)state).CancelConnectionClosedToken(), this);
|
||||
|
|
@ -99,18 +83,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
{
|
||||
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Complete the output after disposing the socket
|
||||
Output.Complete(sendError);
|
||||
}
|
||||
}
|
||||
|
||||
public override void Abort()
|
||||
{
|
||||
// Try to gracefully close the socket to match libuv behavior.
|
||||
Shutdown();
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
||||
private async Task DoReceive()
|
||||
|
|
@ -205,7 +183,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
}
|
||||
}
|
||||
|
||||
private async Task<Exception> DoSend()
|
||||
private async Task DoSend()
|
||||
{
|
||||
Exception error = null;
|
||||
|
||||
|
|
@ -229,10 +207,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
{
|
||||
error = new IOException(ex.Message, ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Shutdown();
|
||||
|
||||
Shutdown();
|
||||
|
||||
return error;
|
||||
// Complete the output after disposing the socket
|
||||
Output.Complete(error);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessSends()
|
||||
|
|
@ -280,8 +261,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
|
|||
_aborted = true;
|
||||
_trace.ConnectionWriteFin(ConnectionId);
|
||||
|
||||
// Try to gracefully close the socket even for aborts to match libuv behavior.
|
||||
_socket.Shutdown(SocketShutdown.Both);
|
||||
try
|
||||
{
|
||||
// Try to gracefully close the socket even for aborts to match libuv behavior.
|
||||
_socket.Shutdown(SocketShutdown.Both);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Ignore any errors from Socket.Shutdown since we're tearing down the connection anyway.
|
||||
}
|
||||
|
||||
_socket.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ using Microsoft.AspNetCore.Http;
|
|||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Testing;
|
||||
|
|
@ -1208,6 +1209,149 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
Assert.Equal(2, abortedRequestId);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(ConnectionAdapterData))]
|
||||
public async Task ServerCanAbortConnectionAfterUnobservedClose(ListenOptions listenOptions)
|
||||
{
|
||||
const int connectionPausedEventId = 4;
|
||||
const int connectionFinSentEventId = 7;
|
||||
const int maxRequestBufferSize = 4096;
|
||||
|
||||
var readCallbackUnwired = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
var clientClosedConnection = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
var serverClosedConnection = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
var appFuncCompleted = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var mockLogger = new Mock<ILogger>();
|
||||
mockLogger
|
||||
.Setup(logger => logger.IsEnabled(It.IsAny<LogLevel>()))
|
||||
.Returns(true);
|
||||
mockLogger
|
||||
.Setup(logger => logger.Log(It.IsAny<LogLevel>(), It.IsAny<EventId>(), It.IsAny<object>(), It.IsAny<Exception>(), It.IsAny<Func<object, Exception, string>>()))
|
||||
.Callback<LogLevel, EventId, object, Exception, Func<object, Exception, string>>((logLevel, eventId, state, exception, formatter) =>
|
||||
{
|
||||
if (eventId.Id == connectionPausedEventId)
|
||||
{
|
||||
readCallbackUnwired.TrySetResult(null);
|
||||
}
|
||||
else if (eventId.Id == connectionFinSentEventId)
|
||||
{
|
||||
serverClosedConnection.SetResult(null);
|
||||
}
|
||||
|
||||
Logger.Log(logLevel, eventId, state, exception, formatter);
|
||||
});
|
||||
|
||||
var mockLoggerFactory = new Mock<ILoggerFactory>();
|
||||
mockLoggerFactory
|
||||
.Setup(factory => factory.CreateLogger(It.IsAny<string>()))
|
||||
.Returns(Logger);
|
||||
mockLoggerFactory
|
||||
.Setup(factory => factory.CreateLogger(It.IsIn("Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv",
|
||||
"Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets")))
|
||||
.Returns(mockLogger.Object);
|
||||
|
||||
var mockKestrelTrace = new Mock<KestrelTrace>(Logger) { CallBase = true };
|
||||
var testContext = new TestServiceContext(mockLoggerFactory.Object)
|
||||
{
|
||||
Log = mockKestrelTrace.Object,
|
||||
ServerOptions =
|
||||
{
|
||||
Limits =
|
||||
{
|
||||
MaxRequestBufferSize = maxRequestBufferSize,
|
||||
MaxRequestLineSize = maxRequestBufferSize,
|
||||
MaxRequestHeadersTotalSize = maxRequestBufferSize,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var scratchBuffer = new byte[maxRequestBufferSize * 2 + 1];
|
||||
|
||||
using (var server = new TestServer(async context =>
|
||||
{
|
||||
await clientClosedConnection.Task;
|
||||
|
||||
context.Abort();
|
||||
|
||||
await serverClosedConnection.Task;
|
||||
|
||||
// TaskContinuationOptions.RunContinuationsAsynchronously sometimes runs inline anyway in
|
||||
// situations such as this where the awaiter starts awaiting right when SetResult is called.
|
||||
_ = Task.Run(() => appFuncCompleted.SetResult(null));
|
||||
}, testContext, listenOptions))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
await connection.Send(
|
||||
"POST / HTTP/1.1",
|
||||
"Host:",
|
||||
$"Content-Length: {scratchBuffer.Length}",
|
||||
"",
|
||||
"");
|
||||
|
||||
var ignore = connection.Stream.WriteAsync(scratchBuffer, 0, scratchBuffer.Length);
|
||||
|
||||
// Wait until the read callback is no longer hooked up so that the connection disconnect isn't observed.
|
||||
await readCallbackUnwired.Task.TimeoutAfter(TestConstants.DefaultTimeout);
|
||||
}
|
||||
|
||||
clientClosedConnection.SetResult(null);
|
||||
|
||||
await appFuncCompleted.Task.TimeoutAfter(TestConstants.DefaultTimeout);
|
||||
}
|
||||
|
||||
mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny<string>()), Times.Once());
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(ConnectionAdapterData))]
|
||||
public async Task AppCanHandleClientAbortingConnectionMidRequest(ListenOptions listenOptions)
|
||||
{
|
||||
var readTcs = new TaskCompletionSource<Exception>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var mockKestrelTrace = new Mock<KestrelTrace>(Logger) { CallBase = true };
|
||||
var testContext = new TestServiceContext()
|
||||
{
|
||||
Log = mockKestrelTrace.Object,
|
||||
};
|
||||
|
||||
var scratchBuffer = new byte[4096];
|
||||
|
||||
using (var server = new TestServer(async context =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await context.Request.Body.CopyToAsync(Stream.Null);;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
readTcs.SetException(ex);
|
||||
throw;
|
||||
}
|
||||
|
||||
readTcs.SetException(new Exception("This shouldn't be reached."));
|
||||
|
||||
}, testContext, listenOptions))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
await connection.Send(
|
||||
"POST / HTTP/1.1",
|
||||
"Host:",
|
||||
$"Content-Length: {scratchBuffer.Length * 2}",
|
||||
"",
|
||||
"");
|
||||
|
||||
await connection.Stream.WriteAsync(scratchBuffer, 0, scratchBuffer.Length);
|
||||
}
|
||||
|
||||
await Assert.ThrowsAnyAsync<IOException>(() => readTcs.Task).TimeoutAfter(TestConstants.DefaultTimeout);
|
||||
}
|
||||
|
||||
mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny<string>()), Times.Once());
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(ConnectionAdapterData))]
|
||||
public async Task RequestHeadersAreResetOnEachRequest(ListenOptions listenOptions)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ using Microsoft.AspNetCore.Http;
|
|||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Https;
|
||||
|
|
@ -2328,6 +2329,185 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
}
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(ConnectionAdapterData))]
|
||||
public async Task WritingToConnectionAfterUnobservedCloseTriggersRequestAbortedToken(ListenOptions listenOptions)
|
||||
{
|
||||
const int connectionPausedEventId = 4;
|
||||
const int maxRequestBufferSize = 4096;
|
||||
|
||||
var requestAborted = false;
|
||||
var readCallbackUnwired = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
var clientClosedConnection = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
var writeTcs = new TaskCompletionSource<Exception>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var mockKestrelTrace = new Mock<KestrelTrace>(Logger) { CallBase = true };
|
||||
var mockLogger = new Mock<ILogger>();
|
||||
mockLogger
|
||||
.Setup(logger => logger.IsEnabled(It.IsAny<LogLevel>()))
|
||||
.Returns(true);
|
||||
mockLogger
|
||||
.Setup(logger => logger.Log(It.IsAny<LogLevel>(), It.IsAny<EventId>(), It.IsAny<object>(), It.IsAny<Exception>(), It.IsAny<Func<object, Exception, string>>()))
|
||||
.Callback<LogLevel, EventId, object, Exception, Func<object, Exception, string>>((logLevel, eventId, state, exception, formatter) =>
|
||||
{
|
||||
if (eventId.Id == connectionPausedEventId)
|
||||
{
|
||||
readCallbackUnwired.TrySetResult(null);
|
||||
}
|
||||
|
||||
Logger.Log(logLevel, eventId, state, exception, formatter);
|
||||
});
|
||||
|
||||
var mockLoggerFactory = new Mock<ILoggerFactory>();
|
||||
mockLoggerFactory
|
||||
.Setup(factory => factory.CreateLogger(It.IsAny<string>()))
|
||||
.Returns(Logger);
|
||||
mockLoggerFactory
|
||||
.Setup(factory => factory.CreateLogger(It.IsIn("Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv",
|
||||
"Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets")))
|
||||
.Returns(mockLogger.Object);
|
||||
|
||||
var testContext = new TestServiceContext(mockLoggerFactory.Object)
|
||||
{
|
||||
Log = mockKestrelTrace.Object,
|
||||
ServerOptions =
|
||||
{
|
||||
Limits =
|
||||
{
|
||||
MaxRequestBufferSize = maxRequestBufferSize,
|
||||
MaxRequestLineSize = maxRequestBufferSize,
|
||||
MaxRequestHeadersTotalSize = maxRequestBufferSize,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var scratchBuffer = new byte[maxRequestBufferSize * 2 + 1];
|
||||
|
||||
using (var server = new TestServer(async context =>
|
||||
{
|
||||
context.RequestAborted.Register(() =>
|
||||
{
|
||||
requestAborted = true;
|
||||
});
|
||||
|
||||
await clientClosedConnection.Task;
|
||||
|
||||
try
|
||||
{
|
||||
for (var i = 0; i < 1000; i++)
|
||||
{
|
||||
await context.Response.Body.WriteAsync(scratchBuffer, 0, scratchBuffer.Length, context.RequestAborted);
|
||||
await Task.Delay(10);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// TaskContinuationOptions.RunContinuationsAsynchronously sometimes runs inline anyway in
|
||||
// situations such as this where the awaiter starts awaiting right when SetResult is called.
|
||||
_ = Task.Run(() => writeTcs.SetException(ex));
|
||||
throw;
|
||||
}
|
||||
|
||||
writeTcs.SetException(new Exception("This shouldn't be reached."));
|
||||
}, testContext, listenOptions))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
await connection.Send(
|
||||
"POST / HTTP/1.1",
|
||||
"Host:",
|
||||
$"Content-Length: {scratchBuffer.Length}",
|
||||
"",
|
||||
"");
|
||||
|
||||
var ignore = connection.Stream.WriteAsync(scratchBuffer, 0, scratchBuffer.Length);
|
||||
|
||||
// Wait until the read callback is no longer hooked up so that the connection disconnect isn't observed.
|
||||
await readCallbackUnwired.Task.TimeoutAfter(TestConstants.DefaultTimeout);
|
||||
}
|
||||
|
||||
clientClosedConnection.SetResult(null);
|
||||
|
||||
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => writeTcs.Task).TimeoutAfter(TestConstants.DefaultTimeout);
|
||||
}
|
||||
|
||||
mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny<string>()), Times.Once());
|
||||
Assert.True(requestAborted);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(ConnectionAdapterData))]
|
||||
public async Task AppCanHandleClientAbortingConnectionMidResponse(ListenOptions listenOptions)
|
||||
{
|
||||
const int responseBodySegmentSize = 65536;
|
||||
const int responseBodySegmentCount = 100;
|
||||
const int responseBodySize = responseBodySegmentSize * responseBodySegmentCount;
|
||||
|
||||
var requestAborted = false;
|
||||
var appCompletedTcs = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var mockKestrelTrace = new Mock<KestrelTrace>(Logger) { CallBase = true };
|
||||
var testContext = new TestServiceContext()
|
||||
{
|
||||
Log = mockKestrelTrace.Object,
|
||||
};
|
||||
|
||||
var scratchBuffer = new byte[responseBodySegmentSize];
|
||||
|
||||
using (var server = new TestServer(async context =>
|
||||
{
|
||||
context.RequestAborted.Register(() =>
|
||||
{
|
||||
requestAborted = true;
|
||||
});
|
||||
|
||||
context.Response.ContentLength = responseBodySize;
|
||||
|
||||
try
|
||||
{
|
||||
for (var i = 0; i < responseBodySegmentCount; i++)
|
||||
{
|
||||
await context.Response.Body.WriteAsync(scratchBuffer, 0, scratchBuffer.Length);
|
||||
await Task.Delay(10);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
// WriteAsync shouldn't throw without a CancellationToken passed in. Unfortunately a ECONNRESET UvException sometimes gets thrown.
|
||||
// This will be fixed by https://github.com/aspnet/KestrelHttpServer/pull/2547
|
||||
// TaskContinuationOptions.RunContinuationsAsynchronously sometimes runs inline anyway in
|
||||
// situations such as this where the awaiter starts awaiting right when SetResult is called.
|
||||
_ = Task.Run(() => appCompletedTcs.SetResult(null));
|
||||
}
|
||||
}, testContext, listenOptions))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
await connection.Send(
|
||||
"GET / HTTP/1.1",
|
||||
"Host:",
|
||||
"",
|
||||
"");
|
||||
|
||||
var readCount = 0;
|
||||
|
||||
// Read just part of the response and close the connection.
|
||||
// https://github.com/aspnet/KestrelHttpServer/issues/2554
|
||||
for (var i = 0; i < responseBodySegmentCount / 10; i++)
|
||||
{
|
||||
readCount += await connection.Stream.ReadAsync(scratchBuffer, 0, scratchBuffer.Length);
|
||||
}
|
||||
|
||||
connection.Socket.Shutdown(SocketShutdown.Send);
|
||||
|
||||
await appCompletedTcs.Task.TimeoutAfter(TestConstants.DefaultTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny<string>()), Times.Once());
|
||||
Assert.True(requestAborted);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[MemberData(nameof(ConnectionAdapterData))]
|
||||
public async Task NoErrorsLoggedWhenServerEndsConnectionBeforeClient(ListenOptions listenOptions)
|
||||
|
|
@ -2643,6 +2823,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
var responseRateTimeoutMessageLogged = new TaskCompletionSource<object>();
|
||||
var connectionStopMessageLogged = new TaskCompletionSource<object>();
|
||||
var requestAborted = new TaskCompletionSource<object>();
|
||||
var appFuncCompleted = new TaskCompletionSource<object>();
|
||||
|
||||
var mockKestrelTrace = new Mock<IKestrelTrace>();
|
||||
mockKestrelTrace
|
||||
|
|
@ -2650,7 +2831,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
.Callback(() => responseRateTimeoutMessageLogged.SetResult(null));
|
||||
mockKestrelTrace
|
||||
.Setup(trace => trace.ConnectionStop(It.IsAny<string>()))
|
||||
.Callback(() => connectionStopMessageLogged.SetResult(null));;
|
||||
.Callback(() => connectionStopMessageLogged.SetResult(null));
|
||||
|
||||
var testContext = new TestServiceContext
|
||||
{
|
||||
|
|
@ -2661,7 +2842,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
{
|
||||
Limits =
|
||||
{
|
||||
MinResponseDataRate = new MinDataRate(bytesPerSecond: double.MaxValue, gracePeriod: TimeSpan.FromSeconds(2))
|
||||
MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -2677,10 +2858,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
|
||||
context.Response.ContentLength = responseSize;
|
||||
|
||||
for (var i = 0; i < chunks; i++)
|
||||
try
|
||||
{
|
||||
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
|
||||
appLogger.LogInformation("Wrote chunk of {chunkSize} bytes", chunkSize);
|
||||
for (var i = 0; i < chunks; i++)
|
||||
{
|
||||
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
|
||||
appLogger.LogInformation("Wrote chunk of {chunkSize} bytes", chunkSize);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
appFuncCompleted.SetResult(null);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2703,7 +2892,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
await requestAborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60));
|
||||
await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
|
||||
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await AssertStreamAborted(connection.Reader.BaseStream, chunkSize * chunks);
|
||||
|
||||
sw.Stop();
|
||||
|
|
@ -2725,6 +2914,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
var responseRateTimeoutMessageLogged = new TaskCompletionSource<object>();
|
||||
var connectionStopMessageLogged = new TaskCompletionSource<object>();
|
||||
var aborted = new TaskCompletionSource<object>();
|
||||
var appFuncCompleted = new TaskCompletionSource<object>();
|
||||
|
||||
var mockKestrelTrace = new Mock<IKestrelTrace>();
|
||||
mockKestrelTrace
|
||||
|
|
@ -2741,7 +2931,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
{
|
||||
Limits =
|
||||
{
|
||||
MinResponseDataRate = new MinDataRate(bytesPerSecond: double.MaxValue, gracePeriod: TimeSpan.FromSeconds(2))
|
||||
MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -2763,9 +2953,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
|
||||
context.Response.ContentLength = chunks * chunkSize;
|
||||
|
||||
for (var i = 0; i < chunks; i++)
|
||||
try
|
||||
{
|
||||
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
|
||||
for (var i = 0; i < chunks; i++)
|
||||
{
|
||||
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
appFuncCompleted.SetResult(null);
|
||||
throw;
|
||||
}
|
||||
}, testContext, listenOptions))
|
||||
{
|
||||
|
|
@ -2781,6 +2979,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
await aborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60));
|
||||
await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
|
||||
// Temporary workaround for a deadlock when reading from an aborted client SslStream on Mac and Linux.
|
||||
if (TestPlatformHelper.IsWindows)
|
||||
|
|
@ -2796,6 +2995,93 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ConnectionClosedWhenBothRequestAndResponseExperienceBackPressure()
|
||||
{
|
||||
const int bufferSize = 1024;
|
||||
const int bufferCount = 256 * 1024;
|
||||
var responseSize = bufferCount * bufferSize;
|
||||
var buffer = new byte[bufferSize];
|
||||
|
||||
var responseRateTimeoutMessageLogged = new TaskCompletionSource<object>();
|
||||
var connectionStopMessageLogged = new TaskCompletionSource<object>();
|
||||
var requestAborted = new TaskCompletionSource<object>();
|
||||
var appFuncCompleted = new TaskCompletionSource<object>();
|
||||
|
||||
var mockKestrelTrace = new Mock<IKestrelTrace>();
|
||||
mockKestrelTrace
|
||||
.Setup(trace => trace.ResponseMininumDataRateNotSatisfied(It.IsAny<string>(), It.IsAny<string>()))
|
||||
.Callback(() => responseRateTimeoutMessageLogged.SetResult(null));
|
||||
mockKestrelTrace
|
||||
.Setup(trace => trace.ConnectionStop(It.IsAny<string>()))
|
||||
.Callback(() => connectionStopMessageLogged.SetResult(null));
|
||||
|
||||
var testContext = new TestServiceContext
|
||||
{
|
||||
LoggerFactory = LoggerFactory,
|
||||
Log = mockKestrelTrace.Object,
|
||||
SystemClock = new SystemClock(),
|
||||
ServerOptions =
|
||||
{
|
||||
Limits =
|
||||
{
|
||||
MinResponseDataRate = new MinDataRate(bytesPerSecond: 1024 * 1024, gracePeriod: TimeSpan.FromSeconds(2)),
|
||||
MaxRequestBodySize = responseSize
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
|
||||
async Task App(HttpContext context)
|
||||
{
|
||||
context.RequestAborted.Register(() =>
|
||||
{
|
||||
requestAborted.SetResult(null);
|
||||
});
|
||||
|
||||
try
|
||||
{
|
||||
await context.Request.Body.CopyToAsync(context.Response.Body);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// This should always throw an OperationCanceledException. Unfortunately a ECONNRESET UvException sometimes gets thrown.
|
||||
// This will be fixed by https://github.com/aspnet/KestrelHttpServer/pull/2547
|
||||
appFuncCompleted.SetResult(null);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
using (var server = new TestServer(App, testContext, listenOptions))
|
||||
{
|
||||
using (var connection = server.CreateConnection())
|
||||
{
|
||||
// Close the connection with the last request so AssertStreamCompleted actually completes.
|
||||
await connection.Send(
|
||||
"POST / HTTP/1.1",
|
||||
"Host:",
|
||||
$"Content-Length: {responseSize}",
|
||||
"",
|
||||
"");
|
||||
|
||||
var sendTask = Task.Run(async () =>
|
||||
{
|
||||
for (var i = 0; i < bufferCount; i++)
|
||||
{
|
||||
await connection.Stream.WriteAsync(buffer, 0, buffer.Length);
|
||||
}
|
||||
});
|
||||
|
||||
await requestAborted.Task.TimeoutAfter(TimeSpan.FromSeconds(60));
|
||||
await responseRateTimeoutMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await connectionStopMessageLogged.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
await AssertStreamAborted(connection.Stream, responseSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseChunks()
|
||||
{
|
||||
|
|
@ -2804,6 +3090,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
var chunkData = new byte[chunkSize];
|
||||
|
||||
var requestAborted = false;
|
||||
var appFuncCompleted = new TaskCompletionSource<object>();
|
||||
var mockKestrelTrace = new Mock<IKestrelTrace>();
|
||||
|
||||
var testContext = new TestServiceContext
|
||||
|
|
@ -2832,6 +3119,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
{
|
||||
await context.Response.Body.WriteAsync(chunkData, 0, chunkData.Length, context.RequestAborted);
|
||||
}
|
||||
|
||||
appFuncCompleted.SetResult(null);
|
||||
}
|
||||
|
||||
using (var server = new TestServer(App, testContext, listenOptions))
|
||||
|
|
@ -2851,6 +3140,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
// Make sure consuming a single chunk exceeds the 2 second timeout.
|
||||
var targetBytesPerSecond = chunkSize / 4;
|
||||
await AssertStreamCompleted(connection.Reader.BaseStream, minTotalOutputSize, targetBytesPerSecond);
|
||||
await appFuncCompleted.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
|
||||
mockKestrelTrace.Verify(t => t.ResponseMininumDataRateNotSatisfied(It.IsAny<string>(), It.IsAny<string>()), Times.Never());
|
||||
mockKestrelTrace.Verify(t => t.ConnectionStop(It.IsAny<string>()), Times.Once());
|
||||
|
|
@ -2860,7 +3150,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseHeaders()
|
||||
public async Task ConnectionNotClosedWhenClientSatisfiesMinimumDataRateGivenLargeResponseHeaders()
|
||||
{
|
||||
var headerSize = 1024 * 1024; // 1 MB for each header value
|
||||
var headerCount = 64; // 64 MB of headers per response
|
||||
|
|
@ -2934,7 +3224,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
[Fact]
|
||||
public async Task NonZeroContentLengthFor304StatusCodeIsAllowed()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
|
||||
namespace Microsoft.AspNetCore.Testing
|
||||
{
|
||||
|
|
@ -14,7 +15,7 @@ namespace Microsoft.AspNetCore.Testing
|
|||
|
||||
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
|
||||
{
|
||||
var adapted = new AdaptedConnection(new LoggingStream(context.ConnectionStream, NullLogger.Instance));
|
||||
var adapted = new AdaptedConnection(new PassThroughStream(context.ConnectionStream));
|
||||
return Task.FromResult<IAdaptedConnection>(adapted);
|
||||
}
|
||||
|
||||
|
|
@ -31,5 +32,140 @@ namespace Microsoft.AspNetCore.Testing
|
|||
{
|
||||
}
|
||||
}
|
||||
|
||||
private class PassThroughStream : Stream
|
||||
{
|
||||
private readonly Stream _innerStream;
|
||||
|
||||
public PassThroughStream(Stream innerStream)
|
||||
{
|
||||
_innerStream = innerStream;
|
||||
}
|
||||
|
||||
public override bool CanRead => _innerStream.CanRead;
|
||||
|
||||
public override bool CanSeek => _innerStream.CanSeek;
|
||||
|
||||
public override bool CanTimeout => _innerStream.CanTimeout;
|
||||
|
||||
public override bool CanWrite => _innerStream.CanWrite;
|
||||
|
||||
public override long Length => _innerStream.Length;
|
||||
|
||||
public override long Position { get => _innerStream.Position; set => _innerStream.Position = value; }
|
||||
|
||||
public override int ReadTimeout { get => _innerStream.ReadTimeout; set => _innerStream.ReadTimeout = value; }
|
||||
|
||||
public override int WriteTimeout { get => _innerStream.WriteTimeout; set => _innerStream.WriteTimeout = value; }
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
return _innerStream.Read(buffer, offset, count);
|
||||
}
|
||||
|
||||
public override int ReadByte()
|
||||
{
|
||||
return _innerStream.ReadByte();
|
||||
}
|
||||
|
||||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
return _innerStream.ReadAsync(buffer, offset, count, cancellationToken);
|
||||
}
|
||||
|
||||
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
|
||||
{
|
||||
return _innerStream.BeginRead(buffer, offset, count, callback, state);
|
||||
}
|
||||
|
||||
public override int EndRead(IAsyncResult asyncResult)
|
||||
{
|
||||
return _innerStream.EndRead(asyncResult);
|
||||
}
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
_innerStream.Write(buffer, offset, count);
|
||||
}
|
||||
|
||||
|
||||
public override void WriteByte(byte value)
|
||||
{
|
||||
_innerStream.WriteByte(value);
|
||||
}
|
||||
|
||||
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
return _innerStream.WriteAsync(buffer, offset, count, cancellationToken);
|
||||
}
|
||||
|
||||
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
|
||||
{
|
||||
return _innerStream.BeginWrite(buffer, offset, count, callback, state);
|
||||
}
|
||||
|
||||
public override void EndWrite(IAsyncResult asyncResult)
|
||||
{
|
||||
_innerStream.EndWrite(asyncResult);
|
||||
}
|
||||
|
||||
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
|
||||
{
|
||||
return _innerStream.CopyToAsync(destination, bufferSize, cancellationToken);
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
_innerStream.Flush();
|
||||
}
|
||||
|
||||
public override Task FlushAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return _innerStream.FlushAsync();
|
||||
|
||||
}
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
return _innerStream.Seek(offset, origin);
|
||||
}
|
||||
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
_innerStream.SetLength(value);
|
||||
}
|
||||
|
||||
public override void Close()
|
||||
{
|
||||
_innerStream.Close();
|
||||
}
|
||||
|
||||
#if NETCOREAPP2_1
|
||||
public override int Read(Span<byte> buffer)
|
||||
{
|
||||
return _innerStream.Read(buffer);
|
||||
}
|
||||
|
||||
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return _innerStream.ReadAsync(buffer, cancellationToken);
|
||||
}
|
||||
|
||||
public override void Write(ReadOnlySpan<byte> buffer)
|
||||
{
|
||||
_innerStream.Write(buffer);
|
||||
}
|
||||
|
||||
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return _innerStream.WriteAsync(buffer, cancellationToken);
|
||||
}
|
||||
|
||||
public override void CopyTo(Stream destination, int bufferSize)
|
||||
{
|
||||
_innerStream.CopyTo(destination, bufferSize);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,8 @@ namespace Microsoft.AspNetCore.Testing
|
|||
|
||||
public Socket Socket => _socket;
|
||||
|
||||
public Stream Stream => _stream;
|
||||
|
||||
public StreamReader Reader => _reader;
|
||||
|
||||
public void Dispose()
|
||||
|
|
|
|||
Loading…
Reference in New Issue