Simplify connection lifetime control flow (#1776)
* Also make IAdaptedConnection disposable
This commit is contained in:
parent
28b479c99a
commit
6e2fdda162
|
|
@ -5,7 +5,9 @@ using System;
|
|||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
||||
{
|
||||
|
|
@ -15,16 +17,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
|
||||
private readonly Stream _filteredStream;
|
||||
private readonly StreamSocketOutput _output;
|
||||
private readonly IKestrelTrace _trace;
|
||||
|
||||
public AdaptedPipeline(
|
||||
Stream filteredStream,
|
||||
IPipe inputPipe,
|
||||
IPipe outputPipe)
|
||||
IPipe outputPipe,
|
||||
IKestrelTrace trace)
|
||||
{
|
||||
Input = inputPipe;
|
||||
_output = new StreamSocketOutput(filteredStream, outputPipe);
|
||||
|
||||
_filteredStream = filteredStream;
|
||||
_trace = trace;
|
||||
}
|
||||
|
||||
public IPipe Input { get; }
|
||||
|
|
@ -33,14 +37,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
|
||||
public async Task RunAsync()
|
||||
{
|
||||
var inputTask = ReadInputAsync();
|
||||
var outputTask = _output.WriteOutputAsync();
|
||||
try
|
||||
{
|
||||
var inputTask = ReadInputAsync();
|
||||
var outputTask = _output.WriteOutputAsync();
|
||||
|
||||
await inputTask;
|
||||
await inputTask;
|
||||
|
||||
_output.Dispose();
|
||||
_output.Dispose();
|
||||
|
||||
await outputTask;
|
||||
await outputTask;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// adaptedPipeline.RunAsync() shouldn't throw, unless filtered stream's WriteAsync throws.
|
||||
_trace.LogError(0, ex, $"{nameof(AdaptedPipeline)}.{nameof(RunAsync)}");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReadInputAsync()
|
||||
|
|
|
|||
|
|
@ -1,12 +1,13 @@
|
|||
// Copyright (c) .NET Foundation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using Microsoft.AspNetCore.Http.Features;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
||||
{
|
||||
public interface IAdaptedConnection
|
||||
public interface IAdaptedConnection : IDisposable
|
||||
{
|
||||
Stream ConnectionStream { get; }
|
||||
|
||||
|
|
|
|||
|
|
@ -5,10 +5,9 @@ using System;
|
|||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
||||
{
|
||||
public class LoggingConnectionAdapter : IConnectionAdapter
|
||||
{
|
||||
|
|
@ -44,6 +43,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter
|
|||
public void PrepareRequest(IFeatureCollection requestFeatures)
|
||||
{
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -209,5 +209,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
|
|||
}, tcs, cancellationToken);
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
// _output is disposed by ConnectionLifetimeControl
|
||||
_input.Complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
KestrelEventSource.Log.ConnectionStart(connection, connectionInfo);
|
||||
|
||||
// Since data cannot be added to the inputPipe by the transport until OnConnection returns,
|
||||
// Frame.RequestProcessingAsync is guaranteed to unblock the transport thread before calling
|
||||
// Frame.ProcessRequestsAsync is guaranteed to unblock the transport thread before calling
|
||||
// application code.
|
||||
connection.StartRequestProcessing();
|
||||
|
||||
|
|
|
|||
|
|
@ -21,23 +21,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
private readonly FrameConnectionContext _context;
|
||||
private readonly Frame _frame;
|
||||
private readonly List<IConnectionAdapter> _connectionAdapters;
|
||||
private readonly TaskCompletionSource<bool> _frameStartedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
private long _lastTimestamp;
|
||||
private long _timeoutTimestamp = long.MaxValue;
|
||||
private TimeoutAction _timeoutAction;
|
||||
|
||||
private AdaptedPipeline _adaptedPipeline;
|
||||
private Task _lifetimeTask;
|
||||
private Stream _filteredStream;
|
||||
private Task _adaptedPipelineTask;
|
||||
|
||||
public FrameConnection(FrameConnectionContext context)
|
||||
{
|
||||
_context = context;
|
||||
_frame = context.Frame;
|
||||
_connectionAdapters = context.ConnectionAdapters;
|
||||
context.ServiceContext.ConnectionManager.AddConnection(context.FrameConnectionId, this);
|
||||
}
|
||||
|
||||
public string ConnectionId => _context.ConnectionId;
|
||||
|
|
@ -67,51 +64,82 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
|
||||
public void StartRequestProcessing()
|
||||
{
|
||||
_frame.Input = _context.Input.Reader;
|
||||
_frame.Output = _context.OutputProducer;
|
||||
_frame.TimeoutControl = this;
|
||||
_lifetimeTask = ProcessRequestsAsync();
|
||||
}
|
||||
|
||||
if (_connectionAdapters.Count == 0)
|
||||
private async Task ProcessRequestsAsync()
|
||||
{
|
||||
RawStream rawStream = null;
|
||||
|
||||
try
|
||||
{
|
||||
StartFrame();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Ensure that IConnectionAdapter.OnConnectionAsync does not run on the transport thread.
|
||||
_context.ServiceContext.ThreadPool.UnsafeRun(state =>
|
||||
Task adaptedPipelineTask = Task.CompletedTask;
|
||||
|
||||
if (_connectionAdapters.Count == 0)
|
||||
{
|
||||
// ApplyConnectionAdaptersAsync should never throw. If it succeeds, it will call _frame.Start().
|
||||
// Otherwise, it will close the connection.
|
||||
var ignore = ((FrameConnection)state).ApplyConnectionAdaptersAsync();
|
||||
}, this);
|
||||
_frame.Input = _context.Input.Reader;
|
||||
_frame.Output = _context.OutputProducer;
|
||||
}
|
||||
else
|
||||
{
|
||||
rawStream = new RawStream(_context.Input.Reader, _context.OutputProducer);
|
||||
|
||||
try
|
||||
{
|
||||
var adaptedPipeline = await ApplyConnectionAdaptersAsync(rawStream);
|
||||
|
||||
_frame.Input = adaptedPipeline.Input.Reader;
|
||||
_frame.Output = adaptedPipeline.Output;
|
||||
|
||||
adaptedPipelineTask = adaptedPipeline.RunAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}.");
|
||||
|
||||
// Since Frame.ProcessRequestsAsync() isn't called, we have to close the socket here.
|
||||
_context.OutputProducer.Dispose();
|
||||
|
||||
await _socketClosedTcs.Task;
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
_frame.TimeoutControl = this;
|
||||
_lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks;
|
||||
_context.ServiceContext.ConnectionManager.AddConnection(_context.FrameConnectionId, this);
|
||||
|
||||
await _frame.ProcessRequestsAsync();
|
||||
await adaptedPipelineTask;
|
||||
await _socketClosedTcs.Task;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.LogError(0, ex, $"Unexpected exception in {nameof(FrameConnection)}.{nameof(ProcessRequestsAsync)}.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId);
|
||||
rawStream?.Dispose();
|
||||
DisposeAdaptedConnections();
|
||||
}
|
||||
}
|
||||
|
||||
public async void OnConnectionClosed(Exception ex)
|
||||
public void OnConnectionClosed(Exception ex)
|
||||
{
|
||||
// Abort the connection (if it isn't already aborted)
|
||||
_frame.Abort(ex);
|
||||
|
||||
Log.ConnectionStop(ConnectionId);
|
||||
KestrelEventSource.Log.ConnectionStop(this);
|
||||
_socketClosedTcs.SetResult(null);
|
||||
|
||||
// The connection is already in the "aborted" state by this point, but we want to track it
|
||||
// until RequestProcessingAsync completes for graceful shutdown.
|
||||
await StopAsync();
|
||||
|
||||
_context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId);
|
||||
_socketClosedTcs.TrySetResult(null);
|
||||
}
|
||||
|
||||
public async Task StopAsync()
|
||||
public Task StopAsync()
|
||||
{
|
||||
if (await _frameStartedTcs.Task)
|
||||
{
|
||||
await _frame.StopAsync();
|
||||
await (_adaptedPipelineTask ?? Task.CompletedTask);
|
||||
}
|
||||
|
||||
await _socketClosedTcs.Task;
|
||||
_frame.Stop();
|
||||
return _lifetimeTask;
|
||||
}
|
||||
|
||||
public void Abort(Exception ex)
|
||||
|
|
@ -122,7 +150,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
public Task AbortAsync(Exception ex)
|
||||
{
|
||||
_frame.Abort(ex);
|
||||
return StopAsync();
|
||||
return _lifetimeTask;
|
||||
}
|
||||
|
||||
public void Timeout()
|
||||
|
|
@ -130,76 +158,37 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
_frame.SetBadRequestState(RequestRejectionReason.RequestTimeout);
|
||||
}
|
||||
|
||||
private async Task ApplyConnectionAdaptersAsync()
|
||||
private async Task<AdaptedPipeline> ApplyConnectionAdaptersAsync(RawStream rawStream)
|
||||
{
|
||||
try
|
||||
{
|
||||
var rawSocketOutput = _frame.Output;
|
||||
var rawStream = new RawStream(_frame.Input, rawSocketOutput);
|
||||
var adapterContext = new ConnectionAdapterContext(rawStream);
|
||||
var adaptedConnections = new IAdaptedConnection[_connectionAdapters.Count];
|
||||
var adapterContext = new ConnectionAdapterContext(rawStream);
|
||||
var adaptedConnections = new IAdaptedConnection[_connectionAdapters.Count];
|
||||
|
||||
for (var i = 0; i < _connectionAdapters.Count; i++)
|
||||
for (var i = 0; i < _connectionAdapters.Count; i++)
|
||||
{
|
||||
var adaptedConnection = await _connectionAdapters[i].OnConnectionAsync(adapterContext);
|
||||
adaptedConnections[i] = adaptedConnection;
|
||||
adapterContext = new ConnectionAdapterContext(adaptedConnection.ConnectionStream);
|
||||
}
|
||||
|
||||
_filteredStream = adapterContext.ConnectionStream;
|
||||
_frame.AdaptedConnections = adaptedConnections;
|
||||
|
||||
return new AdaptedPipeline(_filteredStream,
|
||||
PipeFactory.Create(AdaptedInputPipeOptions),
|
||||
PipeFactory.Create(AdaptedOutputPipeOptions),
|
||||
Log);
|
||||
}
|
||||
|
||||
private void DisposeAdaptedConnections()
|
||||
{
|
||||
var adaptedConnections = _frame.AdaptedConnections;
|
||||
if (adaptedConnections != null)
|
||||
{
|
||||
for (int i = adaptedConnections.Length - 1; i >= 0; i--)
|
||||
{
|
||||
var adaptedConnection = await _connectionAdapters[i].OnConnectionAsync(adapterContext);
|
||||
adaptedConnections[i] = adaptedConnection;
|
||||
adapterContext = new ConnectionAdapterContext(adaptedConnection.ConnectionStream);
|
||||
adaptedConnections[i].Dispose();
|
||||
}
|
||||
|
||||
if (adapterContext.ConnectionStream != rawStream)
|
||||
{
|
||||
_filteredStream = adapterContext.ConnectionStream;
|
||||
_adaptedPipeline = new AdaptedPipeline(
|
||||
adapterContext.ConnectionStream,
|
||||
PipeFactory.Create(AdaptedInputPipeOptions),
|
||||
PipeFactory.Create(AdaptedOutputPipeOptions));
|
||||
|
||||
_frame.Input = _adaptedPipeline.Input.Reader;
|
||||
_frame.Output = _adaptedPipeline.Output;
|
||||
|
||||
_adaptedPipelineTask = RunAdaptedPipeline();
|
||||
}
|
||||
|
||||
_frame.AdaptedConnections = adaptedConnections;
|
||||
StartFrame();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}.");
|
||||
_frameStartedTcs.SetResult(false);
|
||||
CloseRawPipes();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RunAdaptedPipeline()
|
||||
{
|
||||
try
|
||||
{
|
||||
await _adaptedPipeline.RunAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// adaptedPipeline.RunAsync() shouldn't throw.
|
||||
Log.LogError(0, ex, $"{nameof(FrameConnection)}.{nameof(ApplyConnectionAdaptersAsync)}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
CloseRawPipes();
|
||||
}
|
||||
}
|
||||
|
||||
private void CloseRawPipes()
|
||||
{
|
||||
_filteredStream?.Dispose();
|
||||
_context.OutputProducer.Dispose();
|
||||
_context.Input.Reader.Complete();
|
||||
}
|
||||
|
||||
private void StartFrame()
|
||||
{
|
||||
_lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks;
|
||||
_frame.Start();
|
||||
_frameStartedTcs.SetResult(true);
|
||||
}
|
||||
|
||||
public void Tick(DateTimeOffset now)
|
||||
|
|
@ -216,7 +205,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
|
|||
Timeout();
|
||||
}
|
||||
|
||||
var ignore = StopAsync();
|
||||
_frame.Stop();
|
||||
}
|
||||
|
||||
Interlocked.Exchange(ref _lastTimestamp, timestamp);
|
||||
|
|
|
|||
|
|
@ -54,7 +54,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
protected Stack<KeyValuePair<Func<object, Task>, object>> _onStarting;
|
||||
protected Stack<KeyValuePair<Func<object, Task>, object>> _onCompleted;
|
||||
|
||||
private Task _requestProcessingTask;
|
||||
protected volatile bool _requestProcessingStopping; // volatile, see: https://msdn.microsoft.com/en-us/library/x13ttww7.aspx
|
||||
protected int _requestAborted;
|
||||
private CancellationTokenSource _abortedCts;
|
||||
|
|
@ -104,7 +103,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
public IPipeReader Input { get; set; }
|
||||
public ISocketOutput Output { get; set; }
|
||||
public IEnumerable<IAdaptedConnection> AdaptedConnections { get; set; }
|
||||
public IAdaptedConnection[] AdaptedConnections { get; set; }
|
||||
public ConnectionLifetimeControl LifetimeControl { get; set; }
|
||||
public ITimeoutControl TimeoutControl { get; set; }
|
||||
|
||||
|
|
@ -294,26 +293,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
public bool HasResponseStarted => _requestProcessingStatus == RequestProcessingStatus.ResponseStarted;
|
||||
|
||||
protected FrameRequestHeaders FrameRequestHeaders { get; private set; }
|
||||
protected FrameRequestHeaders FrameRequestHeaders { get; } = new FrameRequestHeaders();
|
||||
|
||||
protected FrameResponseHeaders FrameResponseHeaders { get; private set; }
|
||||
|
||||
public void InitializeHeaders()
|
||||
{
|
||||
if (FrameRequestHeaders == null)
|
||||
{
|
||||
FrameRequestHeaders = new FrameRequestHeaders();
|
||||
}
|
||||
|
||||
RequestHeaders = FrameRequestHeaders;
|
||||
|
||||
if (FrameResponseHeaders == null)
|
||||
{
|
||||
FrameResponseHeaders = new FrameResponseHeaders();
|
||||
}
|
||||
|
||||
ResponseHeaders = FrameResponseHeaders;
|
||||
}
|
||||
protected FrameResponseHeaders FrameResponseHeaders { get; } = new FrameResponseHeaders();
|
||||
|
||||
public void InitializeStreams(MessageBody messageBody)
|
||||
{
|
||||
|
|
@ -333,9 +315,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
|
||||
public void Reset()
|
||||
{
|
||||
FrameRequestHeaders?.Reset();
|
||||
FrameResponseHeaders?.Reset();
|
||||
|
||||
_onStarting = null;
|
||||
_onCompleted = null;
|
||||
|
||||
|
|
@ -366,6 +345,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
LocalPort = LocalEndPoint?.Port ?? 0;
|
||||
ConnectionIdFeature = ConnectionId;
|
||||
|
||||
FrameRequestHeaders.Reset();
|
||||
FrameResponseHeaders.Reset();
|
||||
RequestHeaders = FrameRequestHeaders;
|
||||
ResponseHeaders = FrameResponseHeaders;
|
||||
|
||||
if (AdaptedConnections != null)
|
||||
{
|
||||
try
|
||||
|
|
@ -393,27 +377,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
}
|
||||
|
||||
/// <summary>
|
||||
/// Called once by Connection class to begin the RequestProcessingAsync loop.
|
||||
/// Stops the request processing loop between requests.
|
||||
/// Called on all active connections when the server wants to initiate a shutdown
|
||||
/// and after a keep-alive timeout.
|
||||
/// </summary>
|
||||
public void Start()
|
||||
{
|
||||
Reset();
|
||||
_requestProcessingTask = RequestProcessingAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Should be called when the server wants to initiate a shutdown. The Task returned will
|
||||
/// become complete when the RequestProcessingAsync function has exited. It is expected that
|
||||
/// Stop will be called on all active connections, and Task.WaitAll() will be called on every
|
||||
/// return value.
|
||||
/// </summary>
|
||||
public Task StopAsync()
|
||||
public void Stop()
|
||||
{
|
||||
_requestProcessingStopping = true;
|
||||
Input.CancelPendingRead();
|
||||
|
||||
Debug.Assert(_requestProcessingTask != null);
|
||||
return _requestProcessingTask ?? Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void CancelRequestAbortedToken()
|
||||
|
|
@ -453,7 +424,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
/// The resulting Task from this loop is preserved in a field which is used when the server needs
|
||||
/// to drain and close all currently active connections.
|
||||
/// </summary>
|
||||
public abstract Task RequestProcessingAsync();
|
||||
public abstract Task ProcessRequestsAsync();
|
||||
|
||||
public void OnStarting(Func<object, Task> callback, object state)
|
||||
{
|
||||
|
|
@ -797,11 +768,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
{
|
||||
if (_requestRejectedException != null)
|
||||
{
|
||||
if (FrameRequestHeaders == null || FrameResponseHeaders == null)
|
||||
{
|
||||
InitializeHeaders();
|
||||
}
|
||||
|
||||
return ProduceEnd();
|
||||
}
|
||||
|
||||
|
|
@ -1121,11 +1087,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
StatusCode = statusCode;
|
||||
ReasonPhrase = null;
|
||||
|
||||
if (FrameResponseHeaders == null)
|
||||
{
|
||||
InitializeHeaders();
|
||||
}
|
||||
|
||||
var responseHeaders = FrameResponseHeaders;
|
||||
responseHeaders.Reset();
|
||||
var dateHeaderValues = DateHeaderValueManager.GetDateHeaderValues();
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
/// The resulting Task from this loop is preserved in a field which is used when the server needs
|
||||
/// to drain and close all currently active connections.
|
||||
/// </summary>
|
||||
public override async Task RequestProcessingAsync()
|
||||
public override async Task ProcessRequestsAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
|
@ -36,7 +36,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
{
|
||||
TimeoutControl.SetTimeout(_keepAliveTicks, TimeoutAction.CloseConnection);
|
||||
|
||||
InitializeHeaders();
|
||||
Reset();
|
||||
|
||||
while (!_requestProcessingStopping)
|
||||
{
|
||||
|
|
@ -77,8 +77,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
throw BadHttpRequestException.GetException(
|
||||
RequestRejectionReason.InvalidRequestLine);
|
||||
case RequestProcessingStatus.ParsingHeaders:
|
||||
throw BadHttpRequestException.GetException(RequestRejectionReason
|
||||
.MalformedRequestInvalidHeaders);
|
||||
throw BadHttpRequestException.GetException(
|
||||
RequestRejectionReason.MalformedRequestInvalidHeaders);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -196,15 +196,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
|
|||
// End the connection for non keep alive as data incoming may have been thrown off
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't reset frame state if we're exiting the loop. This avoids losing request rejection
|
||||
// information (for 4xx response), and prevents ObjectDisposedException on HTTPS (ODEs
|
||||
// will be thrown if PrepareRequest is not null and references objects disposed on connection
|
||||
// close - see https://github.com/aspnet/KestrelHttpServer/issues/1103#issuecomment-250237677).
|
||||
if (!_requestProcessingStopping)
|
||||
{
|
||||
Reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (BadHttpRequestException ex)
|
||||
|
|
|
|||
|
|
@ -43,7 +43,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https
|
|||
|
||||
public bool IsHttps => true;
|
||||
|
||||
public async Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
|
||||
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
|
||||
{
|
||||
// Don't trust SslStream not to block.
|
||||
return Task.Run(() => InnerOnConnectionAsync(context));
|
||||
}
|
||||
|
||||
private async Task<IAdaptedConnection> InnerOnConnectionAsync(ConnectionAdapterContext context)
|
||||
{
|
||||
SslStream sslStream;
|
||||
bool certificateRequired;
|
||||
|
|
@ -127,6 +133,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https
|
|||
|
||||
requestFeatures.Get<IHttpRequestFeature>().Scheme = "https";
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_sslStream.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private class ClosedAdaptedConnection : IAdaptedConnection
|
||||
|
|
@ -136,6 +147,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https
|
|||
public void PrepareRequest(IFeatureCollection requestFeatures)
|
||||
{
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,21 +63,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
Input = _connectionContext.Input;
|
||||
Output = new LibuvOutputConsumer(_connectionContext.Output, Thread, _socket, ConnectionId, Log);
|
||||
|
||||
// Start socket prior to applying the ConnectionAdapter
|
||||
StartReading();
|
||||
|
||||
Exception error = null;
|
||||
|
||||
try
|
||||
{
|
||||
// This *must* happen after socket.ReadStart
|
||||
// The socket output consumer is the only thing that can close the connection. If the
|
||||
// output pipe is already closed by the time we start then it's fine since, it'll close gracefully afterwards.
|
||||
await Output.WriteOutputAsync();
|
||||
|
||||
// Now, complete the input so that no more reads can happen
|
||||
Input.Complete(new ConnectionAbortedException());
|
||||
_connectionContext.Output.Complete();
|
||||
_connectionContext.OnConnectionClosed(ex: null);
|
||||
}
|
||||
catch (UvException ex)
|
||||
{
|
||||
error = new IOException(ex.Message, ex);
|
||||
var ioEx = new IOException(ex.Message, ex);
|
||||
|
||||
Input.Complete(ioEx);
|
||||
_connectionContext.Output.Complete(ioEx);
|
||||
_connectionContext.OnConnectionClosed(ioEx);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
@ -85,18 +91,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
|
|||
// on the stream handle
|
||||
Input.CancelPendingFlush();
|
||||
|
||||
// Now, complete the input so that no more reads can happen
|
||||
Input.Complete(new ConnectionAbortedException());
|
||||
|
||||
// Send a FIN
|
||||
Log.ConnectionWriteFin(ConnectionId);
|
||||
|
||||
// We're done with the socket now
|
||||
_socket.Dispose();
|
||||
|
||||
// Tell the kestrel we're done with this connection
|
||||
_connectionContext.OnConnectionClosed(error);
|
||||
_connectionContext.Output.Complete(error);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
|
|
|
|||
|
|
@ -39,18 +39,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
FrameConnectionManager frameConnectionManager,
|
||||
Mock<IKestrelTrace> trace)
|
||||
{
|
||||
var serviceContext = new TestServiceContext
|
||||
{
|
||||
ConnectionManager = frameConnectionManager
|
||||
};
|
||||
|
||||
// The FrameConnection constructor adds itself to the connection manager.
|
||||
var frameConnection = new FrameConnection(new FrameConnectionContext
|
||||
{
|
||||
ServiceContext = serviceContext,
|
||||
ServiceContext = new TestServiceContext(),
|
||||
ConnectionId = connectionId
|
||||
});
|
||||
|
||||
frameConnectionManager.AddConnection(0, frameConnection);
|
||||
|
||||
var connectionCount = 0;
|
||||
frameConnectionManager.Walk(_ => connectionCount++);
|
||||
|
||||
|
|
|
|||
|
|
@ -6,8 +6,10 @@ using System.Collections.Generic;
|
|||
using System.Globalization;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions;
|
||||
using Microsoft.AspNetCore.Testing;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
||||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
||||
|
|
@ -19,12 +21,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
{
|
||||
var frameContext = new FrameContext
|
||||
{
|
||||
ServiceContext = new TestServiceContext()
|
||||
ServiceContext = new TestServiceContext(),
|
||||
ConnectionInformation = Mock.Of<IConnectionInformation>()
|
||||
};
|
||||
|
||||
var frame = new Frame<object>(application: null, frameContext: frameContext);
|
||||
|
||||
frame.InitializeHeaders();
|
||||
frame.Reset();
|
||||
|
||||
IDictionary<string, StringValues> headers = frame.ResponseHeaders;
|
||||
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
_frameContext = new FrameContext
|
||||
{
|
||||
ServiceContext = _serviceContext,
|
||||
ConnectionInformation = new MockConnectionInformation()
|
||||
ConnectionInformation = Mock.Of<IConnectionInformation>()
|
||||
};
|
||||
|
||||
_frame = new TestFrame<object>(application: null, context: _frameContext)
|
||||
|
|
@ -71,7 +71,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
};
|
||||
|
||||
_frame.Reset();
|
||||
_frame.InitializeHeaders();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
|
|
@ -245,28 +244,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void InitializeHeadersResetsRequestHeaders()
|
||||
public void ResetResetsRequestHeaders()
|
||||
{
|
||||
// Arrange
|
||||
var originalRequestHeaders = _frame.RequestHeaders;
|
||||
_frame.RequestHeaders = new FrameRequestHeaders();
|
||||
|
||||
// Act
|
||||
_frame.InitializeHeaders();
|
||||
_frame.Reset();
|
||||
|
||||
// Assert
|
||||
Assert.Same(originalRequestHeaders, _frame.RequestHeaders);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void InitializeHeadersResetsResponseHeaders()
|
||||
public void ResetResetsResponseHeaders()
|
||||
{
|
||||
// Arrange
|
||||
var originalResponseHeaders = _frame.ResponseHeaders;
|
||||
_frame.ResponseHeaders = new FrameResponseHeaders();
|
||||
|
||||
// Act
|
||||
_frame.InitializeHeaders();
|
||||
_frame.Reset();
|
||||
|
||||
// Assert
|
||||
Assert.Same(originalResponseHeaders, _frame.ResponseHeaders);
|
||||
|
|
@ -463,17 +462,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
public void RequestProcessingAsyncEnablesKeepAliveTimeout()
|
||||
public void ProcessRequestsAsyncEnablesKeepAliveTimeout()
|
||||
{
|
||||
var connectionControl = new Mock<ITimeoutControl>();
|
||||
_frame.TimeoutControl = connectionControl.Object;
|
||||
|
||||
_frame.Start();
|
||||
var requestProcessingTask = _frame.ProcessRequestsAsync();
|
||||
|
||||
var expectedKeepAliveTimeout = _serviceContext.ServerOptions.Limits.KeepAliveTimeout.Ticks;
|
||||
connectionControl.Verify(cc => cc.SetTimeout(expectedKeepAliveTimeout, TimeoutAction.CloseConnection));
|
||||
|
||||
var requestProcessingTask = _frame.StopAsync();
|
||||
_frame.Stop();
|
||||
_input.Writer.Complete();
|
||||
|
||||
requestProcessingTask.Wait();
|
||||
|
|
@ -553,12 +552,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
[Fact]
|
||||
public async Task RequestProcessingTaskIsUnwrapped()
|
||||
{
|
||||
_frame.Start();
|
||||
var requestProcessingTask = _frame.ProcessRequestsAsync();
|
||||
|
||||
var data = Encoding.ASCII.GetBytes("GET / HTTP/1.1\r\nHost:\r\n\r\n");
|
||||
await _input.Writer.WriteAsync(data);
|
||||
|
||||
var requestProcessingTask = _frame.StopAsync();
|
||||
_frame.Stop();
|
||||
Assert.IsNotType(typeof(Task<Task>), requestProcessingTask);
|
||||
|
||||
await requestProcessingTask.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
|
|
@ -677,7 +676,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
var headers0 = MakeHeaders(header0Count);
|
||||
var headers1 = MakeHeaders(header1Count, header0Count);
|
||||
|
||||
var requestProcessingTask = _frame.RequestProcessingAsync();
|
||||
var requestProcessingTask = _frame.ProcessRequestsAsync();
|
||||
|
||||
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("GET / HTTP/1.0\r\n"));
|
||||
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders != null);
|
||||
|
|
@ -711,7 +710,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
var headers0 = MakeHeaders(header0Count);
|
||||
var headers1 = MakeHeaders(header1Count, header0Count);
|
||||
|
||||
var requestProcessingTask = _frame.RequestProcessingAsync();
|
||||
var requestProcessingTask = _frame.ProcessRequestsAsync();
|
||||
|
||||
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("GET / HTTP/1.0\r\n"));
|
||||
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders != null);
|
||||
|
|
@ -830,16 +829,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
|
|||
}
|
||||
}
|
||||
|
||||
private class MockConnectionInformation : IConnectionInformation
|
||||
{
|
||||
public IPEndPoint RemoteEndPoint { get; }
|
||||
public IPEndPoint LocalEndPoint { get; }
|
||||
public PipeFactory PipeFactory { get; }
|
||||
public bool RequiresDispatch { get; }
|
||||
public IScheduler InputWriterScheduler { get; }
|
||||
public IScheduler OutputReaderScheduler { get; }
|
||||
}
|
||||
|
||||
private class RequestHeadersWrapper : IHeaderDictionary
|
||||
{
|
||||
IHeaderDictionary _innerHeaders;
|
||||
|
|
|
|||
|
|
@ -156,6 +156,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
public void PrepareRequest(IFeatureCollection requestFeatures)
|
||||
{
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
private class RewritingStream : Stream
|
||||
|
|
|
|||
|
|
@ -97,7 +97,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
[Fact]
|
||||
public async Task DoesNotThrowObjectDisposedExceptionOnConnectionAbort()
|
||||
{
|
||||
var x509Certificate2 = new X509Certificate2(TestResources.TestCertificatePath, "testPassword");
|
||||
var loggerFactory = new HandshakeErrorLoggerFactory();
|
||||
var hostBuilder = new WebHostBuilder()
|
||||
.UseKestrel(options =>
|
||||
|
|
@ -150,7 +149,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
public async Task DoesNotThrowObjectDisposedExceptionFromWriteAsyncAfterConnectionIsAborted()
|
||||
{
|
||||
var tcs = new TaskCompletionSource<object>();
|
||||
var x509Certificate2 = new X509Certificate2(TestResources.TestCertificatePath, "testPassword");
|
||||
var loggerFactory = new HandshakeErrorLoggerFactory();
|
||||
var hostBuilder = new WebHostBuilder()
|
||||
.UseKestrel(options =>
|
||||
|
|
@ -196,6 +194,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
await tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(10));
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/aspnet/KestrelHttpServer/issues/1693
|
||||
[Fact]
|
||||
public async Task DoesNotThrowObjectDisposedExceptionOnEmptyConnection()
|
||||
{
|
||||
var loggerFactory = new HandshakeErrorLoggerFactory();
|
||||
var hostBuilder = new WebHostBuilder()
|
||||
.UseKestrel(options =>
|
||||
{
|
||||
options.Listen(new IPEndPoint(IPAddress.Loopback, 0), listenOptions =>
|
||||
{
|
||||
listenOptions.UseHttps(TestResources.TestCertificatePath, "testPassword");
|
||||
});
|
||||
})
|
||||
.UseLoggerFactory(loggerFactory)
|
||||
.Configure(app => app.Run(httpContext => Task.CompletedTask));
|
||||
|
||||
using (var host = hostBuilder.Build())
|
||||
{
|
||||
host.Start();
|
||||
|
||||
using (var socket = await HttpClientSlim.GetSocket(new Uri($"https://127.0.0.1:{host.GetPort()}/")))
|
||||
using (var stream = new NetworkStream(socket, ownsSocket: false))
|
||||
using (var sslStream = new SslStream(stream, true, (sender, certificate, chain, errors) => true))
|
||||
{
|
||||
await sslStream.AuthenticateAsClientAsync("127.0.0.1", clientCertificates: null,
|
||||
enabledSslProtocols: SslProtocols.Tls11 | SslProtocols.Tls12,
|
||||
checkCertificateRevocation: false);
|
||||
}
|
||||
}
|
||||
|
||||
Assert.False(loggerFactory.ErrorLogger.ObjectDisposedExceptionLogged);
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/aspnet/KestrelHttpServer/pull/1197
|
||||
[Fact]
|
||||
public void ConnectionFilterDoesNotLeakBlock()
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
|
|||
context.Response.OnStarting(() => Task.Run(() => onStartingCalled = true));
|
||||
context.Response.OnCompleted(() => Task.Run(() => onCompletedCalled = true));
|
||||
|
||||
// Prevent OnStarting call (see Frame<T>.RequestProcessingAsync()).
|
||||
// Prevent OnStarting call (see Frame<T>.ProcessRequestsAsync()).
|
||||
throw new Exception();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -70,8 +70,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
ErrorUtilities.ThrowInvalidRequestLine();
|
||||
}
|
||||
|
||||
_frame.InitializeHeaders();
|
||||
|
||||
if (!_frame.TakeMessageHeaders(_buffer, out consumed, out examined))
|
||||
{
|
||||
ErrorUtilities.ThrowInvalidRequestHeaders();
|
||||
|
|
@ -91,7 +89,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
private void ParseRequestHeaders()
|
||||
{
|
||||
_frame.Reset();
|
||||
_frame.InitializeHeaders();
|
||||
|
||||
if (!_frame.TakeMessageHeaders(_buffer, out var consumed, out var examined))
|
||||
{
|
||||
|
|
|
|||
|
|
@ -109,7 +109,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
};
|
||||
|
||||
frame.Reset();
|
||||
frame.InitializeHeaders();
|
||||
|
||||
return frame;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -148,8 +148,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
|
||||
readableBuffer = readableBuffer.Slice(consumed);
|
||||
|
||||
Frame.InitializeHeaders();
|
||||
|
||||
if (!Frame.TakeMessageHeaders(readableBuffer, out consumed, out examined))
|
||||
{
|
||||
ErrorUtilities.ThrowInvalidRequestHeaders();
|
||||
|
|
@ -187,8 +185,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
result = Pipe.Reader.ReadAsync().GetAwaiter().GetResult();
|
||||
readableBuffer = result.Buffer;
|
||||
|
||||
Frame.InitializeHeaders();
|
||||
|
||||
if (!Frame.TakeMessageHeaders(readableBuffer, out consumed, out examined))
|
||||
{
|
||||
ErrorUtilities.ThrowInvalidRequestHeaders();
|
||||
|
|
|
|||
|
|
@ -182,7 +182,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
var frame = new Frame<object>(application: null, frameContext: frameContext);
|
||||
|
||||
frame.Reset();
|
||||
frame.InitializeHeaders();
|
||||
_responseHeadersDirect = (FrameResponseHeaders)frame.ResponseHeaders;
|
||||
var context = new DefaultHttpContext(frame);
|
||||
_response = new DefaultHttpResponse(context);
|
||||
|
|
|
|||
|
|
@ -138,7 +138,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
|
|||
};
|
||||
|
||||
frame.Reset();
|
||||
frame.InitializeHeaders();
|
||||
|
||||
// Start writing
|
||||
var ignore = socketOutput.WriteOutputAsync();
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Http.Features;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter;
|
||||
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
|
||||
|
||||
namespace Microsoft.AspNetCore.Testing
|
||||
|
|
@ -31,6 +30,10 @@ namespace Microsoft.AspNetCore.Testing
|
|||
public void PrepareRequest(IFeatureCollection requestFeatures)
|
||||
{
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue