Added initial connection middleware pipeline (#2003)

* Added initial connection middleware pipeline
- Implemented IConnectionBuilder on ListenOptions. Kept IConnectionAdapter for now.
- Delay the configure callback for ListenOptions until the server has started.
- Added ConnectionLimitMiddleware and HttpConnectionMiddleware
- Expose ConnectionAborted and ConnectionClosed on ConnectionContext and
IConnectionTransportFeature
- Updated the tests
- Removed IConnectionApplicationFeature
- Moved Application to IConnectionTransportFeature
This commit is contained in:
David Fowler 2017-08-21 12:11:27 -07:00 committed by GitHub
parent 59b77bb357
commit 2e6687031d
43 changed files with 618 additions and 412 deletions

View File

@ -24,16 +24,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
private static readonly Func<object, Task> _psuedoAsyncTaskFunc = (obj) => _psuedoAsyncTask;
private readonly TestFrame<object> _frame;
private readonly IPipe _outputPipe;
private (IPipeConnection Transport, IPipeConnection Application) _pair;
private readonly byte[] _writeData;
public FrameWritingBenchmark()
{
var pipeFactory = new PipeFactory();
_outputPipe = pipeFactory.Create();
_frame = MakeFrame(pipeFactory);
_frame = MakeFrame();
_writeData = Encoding.ASCII.GetBytes("Hello, World!");
}
@ -93,9 +90,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
return _frame.ResponseBody.WriteAsync(_writeData, 0, _writeData.Length, default(CancellationToken));
}
private TestFrame<object> MakeFrame(PipeFactory pipeFactory)
private TestFrame<object> MakeFrame()
{
var input = pipeFactory.Create();
var pipeFactory = new PipeFactory();
var pair = pipeFactory.CreateConnectionPair();
_pair = pair;
var serviceContext = new ServiceContext
{
@ -109,8 +108,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
{
ServiceContext = serviceContext,
PipeFactory = pipeFactory,
Input = input.Reader,
Output = _outputPipe
Application = pair.Application,
Transport = pair.Transport
});
frame.Reset();
@ -122,7 +121,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
[IterationCleanup]
public void Cleanup()
{
var reader = _outputPipe.Reader;
var reader = _pair.Application.Input;
if (reader.TryRead(out var readResult))
{
reader.Advance(readResult.Buffer.End);

View File

@ -110,8 +110,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
public void Setup()
{
var pipeFactory = new PipeFactory();
var input = pipeFactory.Create();
var output = pipeFactory.Create();
var pair = pipeFactory.CreateConnectionPair();
var serviceContext = new ServiceContext
{
@ -126,8 +125,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance
ServiceContext = serviceContext,
PipeFactory = pipeFactory,
TimeoutControl = new MockTimeoutControl(),
Input = input.Reader,
Output = output
Application = pair.Application,
Transport = pair.Transport
});
frame.Reset();

View File

@ -5,36 +5,36 @@ using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using System.IO.Pipelines;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public class AdaptedPipeline
public class AdaptedPipeline : IPipeConnection
{
private const int MinAllocBufferSize = 2048;
private readonly IKestrelTrace _trace;
private readonly IPipe _transportOutputPipe;
private readonly IPipeReader _transportInputPipeReader;
private readonly IPipeConnection _transport;
private readonly IPipeConnection _application;
public AdaptedPipeline(IPipeReader transportInputPipeReader,
IPipe transportOutputPipe,
public AdaptedPipeline(IPipeConnection transport,
IPipeConnection application,
IPipe inputPipe,
IPipe outputPipe,
IKestrelTrace trace)
IPipe outputPipe)
{
_transportInputPipeReader = transportInputPipeReader;
_transportOutputPipe = transportOutputPipe;
_transport = transport;
_application = application;
Input = inputPipe;
Output = outputPipe;
_trace = trace;
}
public IPipe Input { get; }
public IPipe Output { get; }
IPipeReader IPipeConnection.Input => Input.Reader;
IPipeWriter IPipeConnection.Output => Output.Writer;
public async Task RunAsync(Stream stream)
{
var inputTask = ReadInputAsync(stream);
@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
if (result.IsCancelled)
{
// Forward the cancellation to the transport pipe
_transportOutputPipe.Reader.CancelPendingRead();
_application.Input.CancelPendingRead();
break;
}
@ -104,7 +104,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
finally
{
Output.Reader.Complete();
_transportOutputPipe.Writer.Complete(error);
_transport.Output.Complete();
}
}
@ -161,8 +161,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
Input.Writer.Complete(error);
// The application could have ended the input pipe so complete
// the transport pipe as well
_transportInputPipeReader.Complete();
_transport.Input.Complete();
}
}
public void Dispose()
{
}
}
}

View File

@ -1,31 +1,27 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Net;
using System.Threading;
using Microsoft.AspNetCore.Hosting.Server;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Protocols.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class ConnectionHandler<TContext> : IConnectionHandler
public class ConnectionHandler : IConnectionHandler
{
private static long _lastFrameConnectionId = long.MinValue;
private readonly ListenOptions _listenOptions;
private readonly ServiceContext _serviceContext;
private readonly IHttpApplication<TContext> _application;
private readonly ConnectionDelegate _connectionDelegate;
public ConnectionHandler(ListenOptions listenOptions, ServiceContext serviceContext, IHttpApplication<TContext> application)
public ConnectionHandler(ServiceContext serviceContext, ConnectionDelegate connectionDelegate)
{
_listenOptions = listenOptions;
_serviceContext = serviceContext;
_application = application;
_connectionDelegate = connectionDelegate;
}
public void OnConnection(IFeatureCollection features)
@ -34,89 +30,57 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
var transportFeature = connectionContext.Features.Get<IConnectionTransportFeature>();
var inputPipe = transportFeature.PipeFactory.Create(GetInputPipeOptions(transportFeature.InputWriterScheduler));
var outputPipe = transportFeature.PipeFactory.Create(GetOutputPipeOptions(transportFeature.OutputReaderScheduler));
// REVIEW: Unfortunately, we still need to use the service context to create the pipes since the settings
// for the scheduler and limits are specified here
var inputOptions = GetInputPipeOptions(_serviceContext, transportFeature.InputWriterScheduler);
var outputOptions = GetOutputPipeOptions(_serviceContext, transportFeature.OutputReaderScheduler);
var connectionId = CorrelationIdGenerator.GetNextId();
var frameConnectionId = Interlocked.Increment(ref _lastFrameConnectionId);
var pair = connectionContext.PipeFactory.CreateConnectionPair(inputOptions, outputOptions);
// Set the transport and connection id
connectionContext.ConnectionId = connectionId;
transportFeature.Connection = new PipeConnection(inputPipe.Reader, outputPipe.Writer);
var applicationConnection = new PipeConnection(outputPipe.Reader, inputPipe.Writer);
connectionContext.ConnectionId = CorrelationIdGenerator.GetNextId();
connectionContext.Transport = pair.Transport;
if (!_serviceContext.ConnectionManager.NormalConnectionCount.TryLockOne())
// This *must* be set before returning from OnConnection
transportFeature.Application = pair.Application;
// REVIEW: This task should be tracked by the server for graceful shutdown
// Today it's handled specifically for http but not for aribitrary middleware
_ = Execute(connectionContext);
}
private async Task Execute(ConnectionContext connectionContext)
{
try
{
var goAway = new RejectionConnection(inputPipe, outputPipe, connectionId, _serviceContext)
{
Connection = applicationConnection
};
connectionContext.Features.Set<IConnectionApplicationFeature>(goAway);
goAway.Reject();
return;
await _connectionDelegate(connectionContext);
}
var frameConnectionContext = new FrameConnectionContext
catch (Exception ex)
{
ConnectionId = connectionId,
FrameConnectionId = frameConnectionId,
ServiceContext = _serviceContext,
PipeFactory = connectionContext.PipeFactory,
ConnectionAdapters = _listenOptions.ConnectionAdapters,
Input = inputPipe,
Output = outputPipe
};
var connectionFeature = connectionContext.Features.Get<IHttpConnectionFeature>();
if (connectionFeature != null)
{
if (connectionFeature.LocalIpAddress != null)
{
frameConnectionContext.LocalEndPoint = new IPEndPoint(connectionFeature.LocalIpAddress, connectionFeature.LocalPort);
}
if (connectionFeature.RemoteIpAddress != null)
{
frameConnectionContext.RemoteEndPoint = new IPEndPoint(connectionFeature.RemoteIpAddress, connectionFeature.RemotePort);
}
_serviceContext.Log.LogCritical(0, ex, $"{nameof(ConnectionHandler)}.{nameof(Execute)}() {connectionContext.ConnectionId}");
}
var connection = new FrameConnection(frameConnectionContext)
{
Connection = applicationConnection
};
connectionContext.Features.Set<IConnectionApplicationFeature>(connection);
// Since data cannot be added to the inputPipe by the transport until OnConnection returns,
// Frame.ProcessRequestsAsync is guaranteed to unblock the transport thread before calling
// application code.
connection.StartRequestProcessing(_application);
}
// Internal for testing
internal PipeOptions GetInputPipeOptions(IScheduler writerScheduler) => new PipeOptions
internal static PipeOptions GetInputPipeOptions(ServiceContext serviceContext, IScheduler writerScheduler) => new PipeOptions
{
ReaderScheduler = _serviceContext.ThreadPool,
ReaderScheduler = serviceContext.ThreadPool,
WriterScheduler = writerScheduler,
MaximumSizeHigh = _serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
MaximumSizeLow = _serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0
MaximumSizeHigh = serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
MaximumSizeLow = serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0
};
internal PipeOptions GetOutputPipeOptions(IScheduler readerScheduler) => new PipeOptions
internal static PipeOptions GetOutputPipeOptions(ServiceContext serviceContext, IScheduler readerScheduler) => new PipeOptions
{
ReaderScheduler = readerScheduler,
WriterScheduler = _serviceContext.ThreadPool,
MaximumSizeHigh = GetOutputResponseBufferSize(),
MaximumSizeLow = GetOutputResponseBufferSize()
WriterScheduler = serviceContext.ThreadPool,
MaximumSizeHigh = GetOutputResponseBufferSize(serviceContext),
MaximumSizeLow = GetOutputResponseBufferSize(serviceContext)
};
private long GetOutputResponseBufferSize()
private static long GetOutputResponseBufferSize(ServiceContext serviceContext)
{
var bufferSize = _serviceContext.ServerOptions.Limits.MaxResponseBufferSize;
var bufferSize = serviceContext.ServerOptions.Limits.MaxResponseBufferSize;
if (bufferSize == 0)
{
// 0 = no buffering so we need to configure the pipe so the the writer waits on the reader directly

View File

@ -0,0 +1,16 @@
using Microsoft.AspNetCore.Protocols;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public static class ConnectionLimitBuilderExtensions
{
public static IConnectionBuilder UseConnectionLimit(this IConnectionBuilder builder, ServiceContext serviceContext)
{
return builder.Use(next =>
{
var middleware = new ConnectionLimitMiddleware(next, serviceContext);
return middleware.OnConnectionAsync;
});
}
}
}

View File

@ -0,0 +1,32 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class ConnectionLimitMiddleware
{
private readonly ServiceContext _serviceContext;
private readonly ConnectionDelegate _next;
public ConnectionLimitMiddleware(ConnectionDelegate next, ServiceContext serviceContext)
{
_next = next;
_serviceContext = serviceContext;
}
public Task OnConnectionAsync(ConnectionContext connection)
{
if (!_serviceContext.ConnectionManager.NormalConnectionCount.TryLockOne())
{
KestrelEventSource.Log.ConnectionRejected(connection.ConnectionId);
_serviceContext.Log.ConnectionRejected(connection.ConnectionId);
connection.Transport.Input.Complete();
connection.Transport.Output.Complete();
return Task.CompletedTask;
}
return _next(connection);
}
}
}

View File

@ -11,7 +11,6 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
@ -21,14 +20,14 @@ using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class FrameConnection : IConnectionApplicationFeature, ITimeoutControl
public class FrameConnection : ITimeoutControl
{
private const int Http2ConnectionNotStarted = 0;
private const int Http2ConnectionStarted = 1;
private const int Http2ConnectionClosed = 2;
private readonly FrameConnectionContext _context;
private List<IAdaptedConnection> _adaptedConnections;
private IList<IAdaptedConnection> _adaptedConnections;
private readonly TaskCompletionSource<object> _socketClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private Frame _frame;
private Http2Connection _http2Connection;
@ -62,8 +61,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public bool TimedOut { get; private set; }
public string ConnectionId => _context.ConnectionId;
public IPipeWriter Input => _context.Input.Writer;
public IPipeReader Output => _context.Output.Reader;
public IPEndPoint LocalEndPoint => _context.LocalEndPoint;
public IPEndPoint RemoteEndPoint => _context.RemoteEndPoint;
@ -88,14 +85,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private IKestrelTrace Log => _context.ServiceContext.Log;
public IPipeConnection Connection { get; set; }
public void StartRequestProcessing<TContext>(IHttpApplication<TContext> application)
public Task StartRequestProcessing<TContext>(IHttpApplication<TContext> application)
{
_lifetimeTask = ProcessRequestsAsync(application);
return _lifetimeTask = ProcessRequestsAsync(application);
}
private async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> application)
private async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> httpApplication)
{
using (BeginConnectionScope())
{
@ -106,23 +101,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
AdaptedPipeline adaptedPipeline = null;
var adaptedPipelineTask = Task.CompletedTask;
var input = _context.Input.Reader;
var output = _context.Output;
var transport = _context.Transport;
var application = _context.Application;
if (_context.ConnectionAdapters.Count > 0)
{
adaptedPipeline = new AdaptedPipeline(input,
output,
PipeFactory.Create(AdaptedInputPipeOptions),
PipeFactory.Create(AdaptedOutputPipeOptions),
Log);
adaptedPipeline = new AdaptedPipeline(transport,
application,
PipeFactory.Create(AdaptedInputPipeOptions),
PipeFactory.Create(AdaptedOutputPipeOptions));
input = adaptedPipeline.Input.Reader;
output = adaptedPipeline.Output;
transport = adaptedPipeline;
}
// _frame must be initialized before adding the connection to the connection manager
CreateFrame(application, input, output);
CreateFrame(httpApplication, transport, application);
// _http2Connection must be initialized before yield control to the transport thread,
// to prevent a race condition where _http2Connection.Abort() is called just as
@ -130,12 +124,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_http2Connection = new Http2Connection(new Http2ConnectionContext
{
ConnectionId = _context.ConnectionId,
ServiceContext = _context.ServiceContext,
ServiceContext = _context.ServiceContext,
PipeFactory = PipeFactory,
LocalEndPoint = LocalEndPoint,
RemoteEndPoint = RemoteEndPoint,
Input = input,
Output = output
Application = application,
Transport = transport
});
// Do this before the first await so we don't yield control to the transport until we've
@ -153,7 +147,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
if (_frame.ConnectionFeatures?.Get<ITlsApplicationProtocolFeature>()?.ApplicationProtocol == "h2" &&
Interlocked.CompareExchange(ref _http2ConnectionState, Http2ConnectionStarted, Http2ConnectionNotStarted) == Http2ConnectionNotStarted)
{
await _http2Connection.ProcessAsync(application);
await _http2Connection.ProcessAsync(httpApplication);
}
else
{
@ -187,9 +181,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
}
internal void CreateFrame<TContext>(IHttpApplication<TContext> application, IPipeReader input, IPipe output)
internal void CreateFrame<TContext>(IHttpApplication<TContext> httpApplication, IPipeConnection transport, IPipeConnection application)
{
_frame = new Frame<TContext>(application, new FrameContext
_frame = new Frame<TContext>(httpApplication, new FrameContext
{
ConnectionId = _context.ConnectionId,
PipeFactory = PipeFactory,
@ -197,8 +191,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
RemoteEndPoint = RemoteEndPoint,
ServiceContext = _context.ServiceContext,
TimeoutControl = this,
Input = input,
Output = output
Transport = transport,
Application = application
});
}
@ -268,7 +262,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
var features = new FeatureCollection();
var connectionAdapters = _context.ConnectionAdapters;
var stream = new RawStream(_context.Input.Reader, _context.Output.Writer);
var stream = new RawStream(_context.Transport.Input, _context.Transport.Output);
var adapterContext = new ConnectionAdapterContext(features, stream);
_adaptedConnections = new List<IAdaptedConnection>(connectionAdapters.Count);

View File

@ -13,12 +13,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public string ConnectionId { get; set; }
public long FrameConnectionId { get; set; }
public ServiceContext ServiceContext { get; set; }
public List<IConnectionAdapter> ConnectionAdapters { get; set; }
public IList<IConnectionAdapter> ConnectionAdapters { get; set; }
public PipeFactory PipeFactory { get; set; }
public IPEndPoint LocalEndPoint { get; set; }
public IPEndPoint RemoteEndPoint { get; set; }
public IPipe Input { get; set; }
public IPipe Output { get; set; }
public IPipeConnection Transport { get; set; }
public IPipeConnection Application { get; set; }
}
}

View File

@ -96,7 +96,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_keepAliveTicks = ServerOptions.Limits.KeepAliveTimeout.Ticks;
_requestHeadersTimeoutTicks = ServerOptions.Limits.RequestHeadersTimeout.Ticks;
Output = new OutputProducer(frameContext.Output, frameContext.ConnectionId, frameContext.ServiceContext.Log, TimeoutControl);
Output = new OutputProducer(frameContext.Application.Input, frameContext.Transport.Output, frameContext.ConnectionId, frameContext.ServiceContext.Log, TimeoutControl);
RequestBodyPipe = CreateRequestBodyPipe();
}
@ -107,7 +107,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
private IPEndPoint RemoteEndPoint => _frameContext.RemoteEndPoint;
public IFeatureCollection ConnectionFeatures { get; set; }
public IPipeReader Input => _frameContext.Input;
public IPipeReader Input => _frameContext.Transport.Input;
public OutputProducer Output { get; }
public ITimeoutControl TimeoutControl => _frameContext.TimeoutControl;

View File

@ -16,7 +16,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
public IPEndPoint RemoteEndPoint { get; set; }
public IPEndPoint LocalEndPoint { get; set; }
public ITimeoutControl TimeoutControl { get; set; }
public IPipeReader Input { get; set; }
public IPipe Output { get; set; }
public IPipeConnection Transport { get; set; }
public IPipeConnection Application { get; set; }
}
}

View File

@ -24,7 +24,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
private bool _completed = false;
private readonly IPipe _pipe;
private readonly IPipeWriter _pipeWriter;
private readonly IPipeReader _outputPipeReader;
// https://github.com/dotnet/corefxlab/issues/1334
// Pipelines don't support multiple awaiters on flush
@ -34,12 +35,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
private Action _flushCompleted;
public OutputProducer(
IPipe pipe,
IPipeReader outputPipeReader,
IPipeWriter pipeWriter,
string connectionId,
IKestrelTrace log,
ITimeoutControl timeoutControl)
{
_pipe = pipe;
_outputPipeReader = outputPipeReader;
_pipeWriter = pipeWriter;
_connectionId = connectionId;
_timeoutControl = timeoutControl;
_log = log;
@ -70,7 +73,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return;
}
var buffer = _pipe.Writer.Alloc(1);
var buffer = _pipeWriter.Alloc(1);
callback(buffer, state);
buffer.Commit();
}
@ -87,7 +90,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_log.ConnectionDisconnect(_connectionId);
_completed = true;
_pipe.Writer.Complete();
_pipeWriter.Complete();
}
}
@ -103,8 +106,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
_log.ConnectionDisconnect(_connectionId);
_completed = true;
_pipe.Reader.CancelPendingRead();
_pipe.Writer.Complete(error);
_outputPipeReader.CancelPendingRead();
_pipeWriter.Complete(error);
}
}
@ -122,7 +125,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
return Task.CompletedTask;
}
writableBuffer = _pipe.Writer.Alloc(1);
writableBuffer = _pipeWriter.Alloc(1);
var writer = new WritableBufferWriter(writableBuffer);
if (buffer.Count > 0)
{

View File

@ -40,13 +40,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public Http2Connection(Http2ConnectionContext context)
{
_context = context;
_frameWriter = new Http2FrameWriter(context.Output);
_frameWriter = new Http2FrameWriter(context.Transport.Output, context.Application.Input);
_hpackDecoder = new HPackDecoder();
}
public string ConnectionId => _context.ConnectionId;
public IPipeReader Input => _context.Input;
public IPipeReader Input => _context.Transport.Input;
public IKestrelTrace Log => _context.ServiceContext.Log;

View File

@ -14,7 +14,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
public IPEndPoint LocalEndPoint { get; set; }
public IPEndPoint RemoteEndPoint { get; set; }
public IPipeReader Input { get; set; }
public IPipe Output { get; set; }
public IPipeConnection Transport { get; set; }
public IPipeConnection Application { get; set; }
}
}

View File

@ -19,13 +19,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
private readonly Http2Frame _outgoingFrame = new Http2Frame();
private readonly object _writeLock = new object();
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
private readonly IPipe _output;
private readonly IPipeWriter _outputWriter;
private readonly IPipeReader _outputReader;
private bool _completed;
public Http2FrameWriter(IPipe output)
public Http2FrameWriter(IPipeWriter outputPipeWriter, IPipeReader outputPipeReader)
{
_output = output;
_outputWriter = outputPipeWriter;
_outputReader = outputPipeReader;
}
public void Abort(Exception ex)
@ -33,8 +35,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
lock (_writeLock)
{
_completed = true;
_output.Reader.CancelPendingRead();
_output.Writer.Complete(ex);
_outputReader.CancelPendingRead();
_outputWriter.Complete(ex);
}
}
@ -173,7 +175,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
return;
}
var writeableBuffer = _output.Writer.Alloc(1);
var writeableBuffer = _outputWriter.Alloc(1);
writeableBuffer.Write(data);
await writeableBuffer.FlushAsync(cancellationToken);
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public static class HttpConnectionBuilderExtensions
{
public static IConnectionBuilder UseHttpServer<TContext>(this IConnectionBuilder builder, ServiceContext serviceContext, IHttpApplication<TContext> application)
{
return builder.UseHttpServer(Array.Empty<IConnectionAdapter>(), serviceContext, application);
}
public static IConnectionBuilder UseHttpServer<TContext>(this IConnectionBuilder builder, IList<IConnectionAdapter> adapters, ServiceContext serviceContext, IHttpApplication<TContext> application)
{
var middleware = new HttpConnectionMiddleware<TContext>(adapters, serviceContext, application);
return builder.Use(next =>
{
return middleware.OnConnectionAsync;
});
}
}
}

View File

@ -0,0 +1,91 @@
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Protocols.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class HttpConnectionMiddleware<TContext>
{
private static long _lastFrameConnectionId = long.MinValue;
private readonly IList<IConnectionAdapter> _connectionAdapters;
private readonly ServiceContext _serviceContext;
private readonly IHttpApplication<TContext> _application;
public HttpConnectionMiddleware(IList<IConnectionAdapter> adapters, ServiceContext serviceContext, IHttpApplication<TContext> application)
{
_serviceContext = serviceContext;
_application = application;
// Keeping these around for now so progress can be made without updating tests
_connectionAdapters = adapters;
}
public Task OnConnectionAsync(ConnectionContext connectionContext)
{
// We need the transport feature so that we can cancel the output reader that the transport is using
// This is a bit of a hack but it preserves the existing semantics
var transportFeature = connectionContext.Features.Get<IConnectionTransportFeature>();
var frameConnectionId = Interlocked.Increment(ref _lastFrameConnectionId);
var frameConnectionContext = new FrameConnectionContext
{
ConnectionId = connectionContext.ConnectionId,
FrameConnectionId = frameConnectionId,
ServiceContext = _serviceContext,
PipeFactory = connectionContext.PipeFactory,
ConnectionAdapters = _connectionAdapters,
Transport = connectionContext.Transport,
Application = transportFeature.Application
};
var connectionFeature = connectionContext.Features.Get<IHttpConnectionFeature>();
if (connectionFeature != null)
{
if (connectionFeature.LocalIpAddress != null)
{
frameConnectionContext.LocalEndPoint = new IPEndPoint(connectionFeature.LocalIpAddress, connectionFeature.LocalPort);
}
if (connectionFeature.RemoteIpAddress != null)
{
frameConnectionContext.RemoteEndPoint = new IPEndPoint(connectionFeature.RemoteIpAddress, connectionFeature.RemotePort);
}
}
var connection = new FrameConnection(frameConnectionContext);
// The order here is important, start request processing so that
// the frame is created before this yields. Events need to be wired up
// afterwards
var processingTask = connection.StartRequestProcessing(_application);
// Wire up the events an forward calls to the frame connection
// It's important that these execute synchronously because graceful
// connection close is order sensative (for now)
connectionContext.ConnectionAborted.ContinueWith((task, state) =>
{
// Unwrap the aggregate exception
((FrameConnection)state).Abort(task.Exception?.InnerException);
},
connection, TaskContinuationOptions.ExecuteSynchronously);
connectionContext.ConnectionClosed.ContinueWith((task, state) =>
{
// Unwrap the aggregate exception
((FrameConnection)state).OnConnectionClosed(task.Exception?.InnerException);
},
connection, TaskContinuationOptions.ExecuteSynchronously);
return processingTask;
}
}
}

View File

@ -1,47 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using Microsoft.AspNetCore.Protocols.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class RejectionConnection : IConnectionApplicationFeature
{
private readonly IKestrelTrace _log;
private readonly IPipe _input;
private readonly IPipe _output;
public RejectionConnection(IPipe input, IPipe output, string connectionId, ServiceContext serviceContext)
{
ConnectionId = connectionId;
_log = serviceContext.Log;
_input = input;
_output = output;
}
public string ConnectionId { get; }
public IPipeWriter Input => _input.Writer;
public IPipeReader Output => _output.Reader;
public IPipeConnection Connection { get; set; }
public void Reject()
{
KestrelEventSource.Log.ConnectionRejected(ConnectionId);
_log.ConnectionRejected(ConnectionId);
_input.Reader.Complete();
_output.Writer.Complete();
}
void IConnectionApplicationFeature.OnConnectionClosed(Exception ex)
{
}
void IConnectionApplicationFeature.Abort(Exception ex)
{
}
}
}

View File

@ -135,7 +135,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
async Task OnBind(ListenOptions endpoint)
{
var connectionHandler = new ConnectionHandler<TContext>(endpoint, ServiceContext, application);
// Add the connection limit middleware
endpoint.UseConnectionLimit(ServiceContext);
// Configure the user delegate
endpoint.Configure(endpoint);
// Add the HTTP middleware as the terminal connection middleware
endpoint.UseHttpServer(endpoint.ConnectionAdapters, ServiceContext, application);
var connectionHandler = new ConnectionHandler(ServiceContext, endpoint.Build());
var transport = _transportFactory.Create(endpoint, connectionHandler);
_transports.Add(transport);

View File

@ -100,8 +100,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
throw new ArgumentNullException(nameof(configure));
}
var listenOptions = new ListenOptions(endPoint) { KestrelServerOptions = this };
configure(listenOptions);
var listenOptions = new ListenOptions(endPoint) { KestrelServerOptions = this, Configure = configure };
ListenOptions.Add(listenOptions);
}
@ -132,8 +131,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
throw new ArgumentNullException(nameof(configure));
}
var listenOptions = new ListenOptions(socketPath) { KestrelServerOptions = this };
configure(listenOptions);
var listenOptions = new ListenOptions(socketPath) { KestrelServerOptions = this, Configure = configure };
ListenOptions.Add(listenOptions);
}
@ -156,8 +154,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
throw new ArgumentNullException(nameof(configure));
}
var listenOptions = new ListenOptions(handle) { KestrelServerOptions = this };
configure(listenOptions);
var listenOptions = new ListenOptions(handle) { KestrelServerOptions = this, Configure = configure };
ListenOptions.Add(listenOptions);
}
}

View File

@ -5,6 +5,8 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
@ -14,9 +16,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
/// Describes either an <see cref="IPEndPoint"/>, Unix domain socket path, or a file descriptor for an already open
/// socket that Kestrel should bind to or open.
/// </summary>
public class ListenOptions : IEndPointInformation
public class ListenOptions : IEndPointInformation, IConnectionBuilder
{
private FileHandleType _handleType;
private readonly List<Func<ConnectionDelegate, ConnectionDelegate>> _components = new List<Func<ConnectionDelegate, ConnectionDelegate>>();
internal ListenOptions(IPEndPoint endPoint)
{
@ -126,6 +129,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
/// </remarks>
public List<IConnectionAdapter> ConnectionAdapters { get; } = new List<IConnectionAdapter>();
public IServiceProvider ApplicationServices => KestrelServerOptions?.ApplicationServices;
internal Action<ListenOptions> Configure { get; set; } = _ => { };
/// <summary>
/// Gets the name of this endpoint to display on command-line when the web server starts.
/// </summary>
@ -149,5 +156,27 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
}
public override string ToString() => GetDisplayName();
public IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware)
{
_components.Add(middleware);
return this;
}
public ConnectionDelegate Build()
{
ConnectionDelegate app = context =>
{
return Task.CompletedTask;
};
for (int i = _components.Count - 1; i >= 0; i--)
{
var component = _components[i];
app = component(app);
}
return app;
}
}
}

View File

@ -3,7 +3,7 @@ using System.Collections;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols.Features;
@ -17,12 +17,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
private static readonly Type IHttpConnectionFeatureType = typeof(IHttpConnectionFeature);
private static readonly Type IConnectionIdFeatureType = typeof(IConnectionIdFeature);
private static readonly Type IConnectionTransportFeatureType = typeof(IConnectionTransportFeature);
private static readonly Type IConnectionApplicationFeatureType = typeof(IConnectionApplicationFeature);
private object _currentIHttpConnectionFeature;
private object _currentIConnectionIdFeature;
private object _currentIConnectionTransportFeature;
private object _currentIConnectionApplicationFeature;
private int _featureRevision;
@ -99,12 +97,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
PipeFactory IConnectionTransportFeature.PipeFactory => PipeFactory;
IPipeConnection IConnectionTransportFeature.Connection
IPipeConnection IConnectionTransportFeature.Transport
{
get => Transport;
set => Transport = value;
}
IPipeConnection IConnectionTransportFeature.Application
{
get => Application;
set => Application = value;
}
Task IConnectionTransportFeature.ConnectionAborted
{
get => _abortTcs.Task;
}
Task IConnectionTransportFeature.ConnectionClosed
{
get => _closedTcs.Task;
}
object IFeatureCollection.this[Type key]
{
get => FastFeatureGet(key);
@ -142,11 +156,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
return _currentIConnectionTransportFeature;
}
if (key == IConnectionApplicationFeatureType)
{
return _currentIConnectionApplicationFeature;
}
return ExtraFeatureGet(key);
}
@ -172,12 +181,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
return;
}
if (key == IConnectionApplicationFeatureType)
{
_currentIConnectionApplicationFeature = feature;
return;
}
ExtraFeatureSet(key, feature);
}
@ -198,11 +201,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
yield return new KeyValuePair<Type, object>(IConnectionTransportFeatureType, _currentIConnectionTransportFeature);
}
if (_currentIConnectionApplicationFeature != null)
{
yield return new KeyValuePair<Type, object>(IConnectionApplicationFeatureType, _currentIConnectionApplicationFeature);
}
if (MaybeExtra != null)
{
foreach (var item in MaybeExtra)

View File

@ -1,14 +1,15 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Text;
using Microsoft.AspNetCore.Protocols.Features;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
{
public abstract partial class TransportConnection
{
private readonly TaskCompletionSource<object> _abortTcs = new TaskCompletionSource<object>();
private readonly TaskCompletionSource<object> _closedTcs = new TaskCompletionSource<object>();
public TransportConnection()
{
_currentIConnectionIdFeature = this;
@ -28,6 +29,30 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal
public virtual IScheduler OutputReaderScheduler { get; }
public IPipeConnection Transport { get; set; }
public IConnectionApplicationFeature Application => (IConnectionApplicationFeature)_currentIConnectionApplicationFeature;
public IPipeConnection Application { get; set; }
protected void Abort(Exception exception)
{
if (exception == null)
{
_abortTcs.TrySetResult(null);
}
else
{
_abortTcs.TrySetException(exception);
}
}
protected void Close(Exception exception)
{
if (exception == null)
{
_closedTcs.TrySetResult(null);
}
else
{
_closedTcs.TrySetException(exception);
}
}
}
}

View File

@ -46,8 +46,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
}
public IPipeWriter Input => Application.Connection.Output;
public IPipeReader Output => Application.Connection.Input;
public IPipeWriter Input => Application.Output;
public IPipeReader Output => Application.Input;
public LibuvOutputConsumer OutputConsumer { get; set; }
@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
// Now, complete the input so that no more reads can happen
Input.Complete(error ?? new ConnectionAbortedException());
Output.Complete(error);
Application.OnConnectionClosed(error);
Close(error);
// Make sure it isn't possible for a paused read to resume reading after calling uv_close
// on the stream handle
@ -178,7 +178,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
}
}
Application.Abort(error);
Abort(error);
// Complete after aborting the connection
Input.Complete(error);
}
@ -216,7 +216,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
Log.ConnectionReadFin(ConnectionId);
var error = new IOException(ex.Message, ex);
Application.Abort(error);
Abort(error);
Input.Complete(error);
}
}

View File

@ -48,8 +48,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
{
connectionHandler.OnConnection(this);
_input = Application.Connection.Output;
_output = Application.Connection.Input;
_input = Application.Output;
_output = Application.Input;
// Spawn send and receive logic
Task receiveTask = DoReceive();
@ -135,7 +135,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
}
finally
{
Application.Abort(error);
Abort(error);
_input.Complete(error);
}
}
@ -229,7 +229,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
}
finally
{
Application.OnConnectionClosed(error);
Close(error);
_output.Complete(error);
}
}

View File

@ -1,5 +1,7 @@
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Protocols
@ -13,5 +15,9 @@ namespace Microsoft.AspNetCore.Protocols
public abstract IPipeConnection Transport { get; set; }
public abstract PipeFactory PipeFactory { get; }
public abstract Task ConnectionAborted { get; }
public abstract Task ConnectionClosed { get; }
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols.Features;
@ -34,10 +35,14 @@ namespace Microsoft.AspNetCore.Protocols
public override IPipeConnection Transport
{
get => ConnectionTransportFeature.Connection;
set => ConnectionTransportFeature.Connection = value;
get => ConnectionTransportFeature.Transport;
set => ConnectionTransportFeature.Transport = value;
}
public override Task ConnectionAborted => ConnectionTransportFeature.ConnectionAborted;
public override Task ConnectionClosed => ConnectionTransportFeature.ConnectionClosed;
struct FeatureInterfaces
{
public IConnectionIdFeature ConnectionId;

View File

@ -1,18 +0,0 @@
using System;
using System.IO.Pipelines;
namespace Microsoft.AspNetCore.Protocols.Features
{
public interface IConnectionApplicationFeature
{
IPipeConnection Connection { get; set; }
// TODO: Remove these (https://github.com/aspnet/KestrelHttpServer/issues/1772)
// REVIEW: These are around for now because handling pipe events messes with the order
// of operations an that breaks tons of tests. Instead, we preserve the existing semantics
// and ordering.
void Abort(Exception exception);
void OnConnectionClosed(Exception exception);
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Protocols.Features
{
@ -9,10 +10,16 @@ namespace Microsoft.AspNetCore.Protocols.Features
{
PipeFactory PipeFactory { get; }
IPipeConnection Connection { get; set; }
IPipeConnection Transport { get; set; }
IPipeConnection Application { get; set; }
IScheduler InputWriterScheduler { get; }
IScheduler OutputReaderScheduler { get; }
Task ConnectionAborted { get; }
Task ConnectionClosed { get; }
}
}

View File

@ -0,0 +1,21 @@
namespace System.IO.Pipelines
{
public static class PipeFactoryExtensions
{
public static (IPipeConnection Transport, IPipeConnection Application) CreateConnectionPair(this PipeFactory pipeFactory)
{
return pipeFactory.CreateConnectionPair(new PipeOptions(), new PipeOptions());
}
public static (IPipeConnection Transport, IPipeConnection Application) CreateConnectionPair(this PipeFactory pipeFactory, PipeOptions inputOptions, PipeOptions outputOptions)
{
var input = pipeFactory.Create(inputOptions);
var output = pipeFactory.Create(outputOptions);
var transportToApplication = new PipeConnection(output.Reader, input.Writer);
var applicationToTransport = new PipeConnection(input.Reader, output.Writer);
return (applicationToTransport, transportToApplication);
}
}
}

View File

@ -25,6 +25,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public FrameConnectionTests()
{
_pipeFactory = new PipeFactory();
var pair = _pipeFactory.CreateConnectionPair();
_frameConnectionContext = new FrameConnectionContext
{
@ -32,8 +33,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
ConnectionAdapters = new List<IConnectionAdapter>(),
PipeFactory = _pipeFactory,
FrameConnectionId = long.MinValue,
Input = _pipeFactory.Create(),
Output = _pipeFactory.Create(),
Application = pair.Application,
Transport = pair.Transport,
ServiceContext = new TestServiceContext
{
SystemClock = new SystemClock()
@ -54,7 +55,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockDebugger = new Mock<IDebugger>();
mockDebugger.SetupGet(g => g.IsAttached).Returns(true);
_frameConnection.Debugger = mockDebugger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
var now = DateTimeOffset.Now;
_frameConnection.Tick(now);
@ -101,7 +102,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_frameConnectionContext.ServiceContext.Log = logger;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
// Initialize timestamp
@ -128,7 +129,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
// Initialize timestamp
@ -170,7 +171,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
// Initialize timestamp
@ -247,7 +248,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
// Initialize timestamp
@ -315,7 +316,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
// Initialize timestamp
@ -377,7 +378,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
var startTime = systemClock.UtcNow;
@ -418,7 +419,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
_frameConnection.Frame.RequestAborted.Register(() =>
{
@ -452,7 +453,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
_frameConnection.Frame.RequestAborted.Register(() =>
{
@ -494,7 +495,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var mockLogger = new Mock<IKestrelTrace>();
_frameConnectionContext.ServiceContext.Log = mockLogger.Object;
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Input.Reader, _frameConnectionContext.Output);
_frameConnection.CreateFrame(new DummyApplication(), _frameConnectionContext.Transport, _frameConnectionContext.Application);
_frameConnection.Frame.Reset();
_frameConnection.Frame.RequestAborted.Register(() =>
{
@ -531,7 +532,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[Fact]
public async Task StartRequestProcessingCreatesLogScopeWithConnectionId()
{
_frameConnection.StartRequestProcessing(new DummyApplication());
_ = _frameConnection.StartRequestProcessing(new DummyApplication());
var scopeObjects = ((TestKestrelTrace)_frameConnectionContext.ServiceContext.Log)
.Logger

View File

@ -18,10 +18,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[Fact]
public void InitialDictionaryIsEmpty()
{
var factory = new PipeFactory();
var pair = factory.CreateConnectionPair();
var frameContext = new FrameContext
{
ServiceContext = new TestServiceContext(),
PipeFactory = new PipeFactory(),
PipeFactory = factory,
Application = pair.Application,
Transport = pair.Transport,
TimeoutControl = null
};

View File

@ -27,7 +27,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class FrameTests : IDisposable
{
private readonly IPipe _input;
private readonly IPipeConnection _transport;
private readonly IPipeConnection _application;
private readonly TestFrame<object> _frame;
private readonly ServiceContext _serviceContext;
private readonly FrameContext _frameContext;
@ -52,8 +53,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public FrameTests()
{
_pipelineFactory = new PipeFactory();
_input = _pipelineFactory.Create();
var output = _pipelineFactory.Create();
var pair = _pipelineFactory.CreateConnectionPair();
_transport = pair.Transport;
_application = pair.Application;
_serviceContext = new TestServiceContext();
_timeoutControl = new Mock<ITimeoutControl>();
@ -62,8 +65,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
ServiceContext = _serviceContext,
PipeFactory = _pipelineFactory,
TimeoutControl = _timeoutControl.Object,
Input = _input.Reader,
Output = output
Application = pair.Application,
Transport = pair.Transport
};
_frame = new TestFrame<object>(application: null, context: _frameContext);
@ -72,8 +75,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public void Dispose()
{
_input.Reader.Complete();
_input.Writer.Complete();
_transport.Input.Complete();
_application.Output.Complete();
_application.Input.Complete();
_application.Output.Complete();
_pipelineFactory.Dispose();
}
@ -84,11 +91,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_serviceContext.ServerOptions.Limits.MaxRequestHeadersTotalSize = headerLine.Length - 1;
_frame.Reset();
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"{headerLine}\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"{headerLine}\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() => _frame.TakeMessageHeaders(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.BadRequest_HeadersExceedMaxTotalSize, exception.Message);
Assert.Equal(StatusCodes.Status431RequestHeaderFieldsTooLarge, exception.StatusCode);
@ -100,11 +107,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
const string headerLines = "Header-1: value1\r\nHeader-2: value2\r\n";
_serviceContext.ServerOptions.Limits.MaxRequestHeaderCount = 1;
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"{headerLines}\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"{headerLines}\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() => _frame.TakeMessageHeaders(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.BadRequest_TooManyHeaders, exception.Message);
Assert.Equal(StatusCodes.Status431RequestHeaderFieldsTooLarge, exception.StatusCode);
@ -205,11 +212,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
options.Limits.MaxRequestHeaderCount = 1;
_serviceContext.ServerOptions = options;
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"{headerLine1}\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"{headerLine1}\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var takeMessageHeaders = _frame.TakeMessageHeaders(readableBuffer, out _consumed, out _examined);
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.True(takeMessageHeaders);
Assert.Equal(1, _frame.RequestHeaders.Count);
@ -217,11 +224,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_frame.Reset();
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"{headerLine2}\r\n"));
readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"{headerLine2}\r\n"));
readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
takeMessageHeaders = _frame.TakeMessageHeaders(readableBuffer, out _consumed, out _examined);
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.True(takeMessageHeaders);
Assert.Equal(1, _frame.RequestHeaders.Count);
@ -332,8 +339,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
string requestLine,
string expectedMethod,
string expectedRawTarget,
// This warns that theory methods should use all of their parameters,
// but this method is using a shared data collection with HttpParserTests.ParsesRequestLine and others.
// This warns that theory methods should use all of their parameters,
// but this method is using a shared data collection with HttpParserTests.ParsesRequestLine and others.
#pragma warning disable xUnit1026
string expectedRawPath,
#pragma warning restore xUnit1026
@ -342,11 +349,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
string expectedHttpVersion)
{
var requestLineBytes = Encoding.ASCII.GetBytes(requestLine);
await _input.Writer.WriteAsync(requestLineBytes);
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(requestLineBytes);
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var returnValue = _frame.TakeStartLine(readableBuffer, out _consumed, out _examined);
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.True(returnValue);
Assert.Equal(expectedMethod, _frame.Method);
@ -365,11 +372,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
string expectedQueryString)
{
var requestLineBytes = Encoding.ASCII.GetBytes(requestLine);
await _input.Writer.WriteAsync(requestLineBytes);
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(requestLineBytes);
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var returnValue = _frame.TakeStartLine(readableBuffer, out _consumed, out _examined);
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.True(returnValue);
Assert.Equal(expectedRawTarget, _frame.RawTarget);
@ -380,10 +387,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[Fact]
public async Task ParseRequestStartsRequestHeadersTimeoutOnFirstByteAvailable()
{
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("G"));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes("G"));
_frame.ParseRequest((await _input.Reader.ReadAsync()).Buffer, out _consumed, out _examined);
_input.Reader.Advance(_consumed, _examined);
_frame.ParseRequest((await _transport.Input.ReadAsync()).Buffer, out _consumed, out _examined);
_transport.Input.Advance(_consumed, _examined);
var expectedRequestHeadersTimeout = _serviceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks;
_timeoutControl.Verify(cc => cc.ResetTimeout(expectedRequestHeadersTimeout, TimeoutAction.SendTimeoutResponse));
@ -395,11 +402,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_serviceContext.ServerOptions.Limits.MaxRequestLineSize = "GET / HTTP/1.1\r\n".Length;
var requestLineBytes = Encoding.ASCII.GetBytes("GET /a HTTP/1.1\r\n");
await _input.Writer.WriteAsync(requestLineBytes);
await _application.Output.WriteAsync(requestLineBytes);
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() => _frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.BadRequest_RequestLineTooLong, exception.Message);
Assert.Equal(StatusCodes.Status414UriTooLong, exception.StatusCode);
@ -409,12 +416,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[MemberData(nameof(TargetWithEncodedNullCharData))]
public async Task TakeStartLineThrowsOnEncodedNullCharInTarget(string target)
{
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"GET {target} HTTP/1.1\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"GET {target} HTTP/1.1\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.FormatBadRequest_InvalidRequestTarget_Detail(target), exception.Message);
}
@ -423,12 +430,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[MemberData(nameof(TargetWithNullCharData))]
public async Task TakeStartLineThrowsOnNullCharInTarget(string target)
{
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"GET {target} HTTP/1.1\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"GET {target} HTTP/1.1\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.FormatBadRequest_InvalidRequestTarget_Detail(target.EscapeNonPrintable()), exception.Message);
}
@ -439,12 +446,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
var requestLine = $"{method} / HTTP/1.1\r\n";
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(requestLine));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(requestLine));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.FormatBadRequest_InvalidRequestLine_Detail(requestLine.EscapeNonPrintable()), exception.Message);
}
@ -455,12 +462,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
var target = $"/{queryString}";
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"GET {target} HTTP/1.1\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"GET {target} HTTP/1.1\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.FormatBadRequest_InvalidRequestTarget_Detail(target.EscapeNonPrintable()), exception.Message);
}
@ -471,12 +478,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
var requestLine = $"{method} {target} HTTP/1.1\r\n";
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(requestLine));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(requestLine));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.FormatBadRequest_InvalidRequestTarget_Detail(target.EscapeNonPrintable()), exception.Message);
}
@ -485,12 +492,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
[MemberData(nameof(MethodNotAllowedTargetData))]
public async Task TakeStartLineThrowsWhenMethodNotAllowed(string requestLine, HttpMethod allowedMethod)
{
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(requestLine));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(requestLine));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(405, exception.StatusCode);
Assert.Equal(CoreStrings.BadRequest_MethodNotAllowed, exception.Message);
@ -506,7 +513,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_timeoutControl.Verify(cc => cc.SetTimeout(expectedKeepAliveTimeout, TimeoutAction.CloseConnection));
_frame.Stop();
_input.Writer.Complete();
_application.Output.Complete();
requestProcessingTask.Wait();
}
@ -588,13 +595,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var requestProcessingTask = _frame.ProcessRequestsAsync();
var data = Encoding.ASCII.GetBytes("GET / HTTP/1.1\r\nHost:\r\n\r\n");
await _input.Writer.WriteAsync(data);
await _application.Output.WriteAsync(data);
_frame.Stop();
Assert.IsNotType<Task<Task>>(requestProcessingTask);
await requestProcessingTask.TimeoutAfter(TimeSpan.FromSeconds(10));
_input.Writer.Complete();
_application.Output.Complete();
}
[Fact]
@ -684,12 +691,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_serviceContext.Log = mockTrace.Object;
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes($"GET /%00 HTTP/1.1\r\n"));
var readableBuffer = (await _input.Reader.ReadAsync()).Buffer;
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes($"GET /%00 HTTP/1.1\r\n"));
var readableBuffer = (await _transport.Input.ReadAsync()).Buffer;
var exception = Assert.Throws<BadHttpRequestException>(() =>
_frame.TakeStartLine(readableBuffer, out _consumed, out _examined));
_input.Reader.Advance(_consumed, _examined);
_transport.Input.Advance(_consumed, _examined);
Assert.Equal(CoreStrings.FormatBadRequest_InvalidRequestTarget_Detail(string.Empty), exception.Message);
Assert.Equal(StatusCodes.Status400BadRequest, exception.StatusCode);
@ -716,19 +723,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var requestProcessingTask = _frame.ProcessRequestsAsync();
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("GET / HTTP/1.0\r\n"));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes("GET / HTTP/1.0\r\n"));
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders != null);
Assert.Equal(0, _frame.RequestHeaders.Count);
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(headers0));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(headers0));
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders.Count >= header0Count);
Assert.Equal(header0Count, _frame.RequestHeaders.Count);
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(headers1));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(headers1));
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders.Count >= header0Count + header1Count);
Assert.Equal(header0Count + header1Count, _frame.RequestHeaders.Count);
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("\r\n"));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes("\r\n"));
Assert.Equal(header0Count + header1Count, _frame.RequestHeaders.Count);
await requestProcessingTask.TimeoutAfter(TimeSpan.FromSeconds(10));
@ -750,7 +757,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var requestProcessingTask = _frame.ProcessRequestsAsync();
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("GET / HTTP/1.0\r\n"));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes("GET / HTTP/1.0\r\n"));
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders != null);
Assert.Equal(0, _frame.RequestHeaders.Count);
@ -758,17 +765,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
_frame.RequestHeaders = newRequestHeaders;
Assert.Same(newRequestHeaders, _frame.RequestHeaders);
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(headers0));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(headers0));
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders.Count >= header0Count);
Assert.Same(newRequestHeaders, _frame.RequestHeaders);
Assert.Equal(header0Count, _frame.RequestHeaders.Count);
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes(headers1));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes(headers1));
await WaitForCondition(TimeSpan.FromSeconds(1), () => _frame.RequestHeaders.Count >= header0Count + header1Count);
Assert.Same(newRequestHeaders, _frame.RequestHeaders);
Assert.Equal(header0Count + header1Count, _frame.RequestHeaders.Count);
await _input.Writer.WriteAsync(Encoding.ASCII.GetBytes("\r\n"));
await _application.Output.WriteAsync(Encoding.ASCII.GetBytes("\r\n"));
Assert.Same(newRequestHeaders, _frame.RequestHeaders);
Assert.Equal(header0Count + header1Count, _frame.RequestHeaders.Count);

View File

@ -73,8 +73,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
private static readonly byte[] _noData = new byte[0];
private readonly PipeFactory _pipeFactory = new PipeFactory();
private readonly IPipe _inputPipe;
private readonly IPipe _outputPipe;
private readonly (IPipeConnection Transport, IPipeConnection Application) _pair;
private readonly Http2ConnectionContext _connectionContext;
private readonly Http2Connection _connection;
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
@ -99,8 +98,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public Http2ConnectionTests()
{
_inputPipe = _pipeFactory.Create();
_outputPipe = _pipeFactory.Create();
_pair = _pipeFactory.CreateConnectionPair();
_noopApplication = context => Task.CompletedTask;
@ -213,8 +211,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
ServiceContext = new TestServiceContext(),
PipeFactory = _pipeFactory,
Input = _inputPipe.Reader,
Output = _outputPipe
Application = _pair.Application,
Transport = _pair.Transport
};
_connection = new Http2Connection(_connectionContext);
}
@ -1201,7 +1199,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
private async Task SendAsync(ArraySegment<byte> span)
{
var writableBuffer = _inputPipe.Writer.Alloc(1);
var writableBuffer = _pair.Application.Output.Alloc(1);
writableBuffer.Write(span);
await writableBuffer.FlushAsync();
}
@ -1413,7 +1411,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
while (true)
{
var result = await _outputPipe.Reader.ReadAsync();
var result = await _pair.Application.Input.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.Start;
var examined = buffer.End;
@ -1429,7 +1427,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
}
finally
{
_outputPipe.Reader.Advance(consumed, examined);
_pair.Application.Input.Advance(consumed, examined);
}
}
}
@ -1456,7 +1454,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
private Task StopConnectionAsync(int expectedLastStreamId, bool ignoreNonGoAwayFrames)
{
_inputPipe.Writer.Complete();
_pair.Application.Output.Complete();
return WaitForConnectionStopAsync(expectedLastStreamId, ignoreNonGoAwayFrames);
}
@ -1486,7 +1484,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.Equal(expectedErrorCode, frame.GoAwayErrorCode);
await _connectionTask;
_inputPipe.Writer.Complete();
_pair.Application.Output.Complete();
}
private async Task WaitForStreamErrorAsync(int expectedStreamId, Http2ErrorCode expectedErrorCode, bool ignoreNonRstStreamFrames)

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Net;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
@ -19,6 +18,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
d.NoDelay = false;
});
// Execute the callback
o1.ListenOptions[1].Configure(o1.ListenOptions[1]);
Assert.True(o1.ListenOptions[0].NoDelay);
Assert.False(o1.ListenOptions[1].NoDelay);
}

View File

@ -445,7 +445,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
// The block returned by IncomingStart always has at least 2048 available bytes,
// so no need to bounds check in this test.
var bytes = Encoding.ASCII.GetBytes(data[0]);
var buffer = input.Pipe.Writer.Alloc(2048);
var buffer = input.Application.Output.Alloc(2048);
ArraySegment<byte> block;
Assert.True(buffer.Buffer.TryGetArray(out block));
Buffer.BlockCopy(bytes, 0, block.Array, block.Offset, bytes.Length);
@ -457,7 +457,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
writeTcs = new TaskCompletionSource<byte[]>();
bytes = Encoding.ASCII.GetBytes(data[1]);
buffer = input.Pipe.Writer.Alloc(2048);
buffer = input.Application.Output.Alloc(2048);
Assert.True(buffer.Buffer.TryGetArray(out block));
Buffer.BlockCopy(bytes, 0, block.Array, block.Offset, bytes.Length);
buffer.Advance(bytes.Length);
@ -467,7 +467,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
if (headers.HeaderConnection == "close")
{
input.Pipe.Writer.Complete();
input.Application.Output.Complete();
}
await copyToAsyncTask;
@ -516,7 +516,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
input.Add("a");
Assert.Equal(1, await stream.ReadAsync(new byte[1], 0, 1));
input.Pipe.Reader.CancelPendingRead();
input.Transport.Input.CancelPendingRead();
// Add more input and verify is read
input.Add("b");

View File

@ -55,7 +55,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
var pipe = _pipeFactory.Create(pipeOptions);
var serviceContext = new TestServiceContext();
var socketOutput = new OutputProducer(
pipe,
pipe.Reader,
pipe.Writer,
"0",
serviceContext.Log,
Mock.Of<ITimeoutControl>());

View File

@ -1,7 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Testing;
@ -22,9 +24,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
serviceContext.ServerOptions.Limits.MaxResponseBufferSize = maxResponseBufferSize;
serviceContext.ThreadPool = new LoggingThreadPool(null);
var connectionHandler = new ConnectionHandler<object>(listenOptions: null, serviceContext: serviceContext, application: null);
var mockScheduler = Mock.Of<IScheduler>();
var outputPipeOptions = connectionHandler.GetOutputPipeOptions(readerScheduler: mockScheduler);
var outputPipeOptions = ConnectionHandler.GetOutputPipeOptions(serviceContext, readerScheduler: mockScheduler);
Assert.Equal(expectedMaximumSizeLow, outputPipeOptions.MaximumSizeLow);
Assert.Equal(expectedMaximumSizeHigh, outputPipeOptions.MaximumSizeHigh);
@ -41,9 +42,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
serviceContext.ServerOptions.Limits.MaxRequestBufferSize = maxRequestBufferSize;
serviceContext.ThreadPool = new LoggingThreadPool(null);
var connectionHandler = new ConnectionHandler<object>(listenOptions: null, serviceContext: serviceContext, application: null);
var mockScheduler = Mock.Of<IScheduler>();
var inputPipeOptions = connectionHandler.GetInputPipeOptions(writerScheduler: mockScheduler);
var inputPipeOptions = ConnectionHandler.GetInputPipeOptions(serviceContext, writerScheduler: mockScheduler);
Assert.Equal(expectedMaximumSizeLow, inputPipeOptions.MaximumSizeLow);
Assert.Equal(expectedMaximumSizeHigh, inputPipeOptions.MaximumSizeHigh);

View File

@ -20,12 +20,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
_memoryPool = new MemoryPool();
_pipelineFactory = new PipeFactory();
Pipe = _pipelineFactory.Create();
var pair = _pipelineFactory.CreateConnectionPair();
Transport = pair.Transport;
Application = pair.Application;
FrameContext = new FrameContext
{
ServiceContext = new TestServiceContext(),
Input = Pipe.Reader,
Application = Application,
Transport = Transport,
PipeFactory = _pipelineFactory,
TimeoutControl = Mock.Of<ITimeoutControl>()
};
@ -34,28 +37,30 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Frame.FrameControl = Mock.Of<IFrameControl>();
}
public IPipe Pipe { get; }
public IPipeConnection Transport { get; }
public IPipeConnection Application { get; }
public PipeFactory PipeFactory => _pipelineFactory;
public FrameContext FrameContext { get; }
public FrameContext FrameContext { get; }
public Frame Frame { get; set; }
public void Add(string text)
{
var data = Encoding.ASCII.GetBytes(text);
Pipe.Writer.WriteAsync(data).Wait();
Application.Output.WriteAsync(data).Wait();
}
public void Fin()
{
Pipe.Writer.Complete();
Application.Output.Complete();
}
public void Cancel()
{
Pipe.Reader.CancelPendingRead();
Transport.Input.CancelPendingRead();
}
public void Dispose()

View File

@ -3,6 +3,7 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
@ -37,7 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
private const int _connectionStartedEventId = 1;
private const int _connectionResetEventId = 19;
private const int _semaphoreWaitTimeout = 2500;
private static readonly int _semaphoreWaitTimeout = Debugger.IsAttached ? 10000 : 2500;
private readonly ITestOutputHelper _output;

View File

@ -683,7 +683,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
private OutputProducer CreateOutputProducer(PipeOptions pipeOptions, CancellationTokenSource cts = null)
{
var pipe = _pipeFactory.Create(pipeOptions);
var pair = _pipeFactory.CreateConnectionPair(pipeOptions, pipeOptions);
var logger = new TestApplicationErrorLogger();
var serviceContext = new TestServiceContext
@ -694,14 +694,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var transportContext = new TestLibuvTransportContext { Log = new LibuvTrace(logger) };
var socket = new MockSocket(_mockLibuv, _libuvThread.Loop.ThreadId, transportContext.Log);
var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, "0", transportContext.Log);
var consumer = new LibuvOutputConsumer(pair.Application.Input, _libuvThread, socket, "0", transportContext.Log);
var frame = new Frame<object>(null, new FrameContext
{
ServiceContext = serviceContext,
PipeFactory = _pipeFactory,
TimeoutControl = Mock.Of<ITimeoutControl>(),
Output = pipe
Application = pair.Application,
Transport = pair.Transport
});
if (cts != null)
@ -709,7 +710,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
frame.RequestAborted.Register(cts.Cancel);
}
var ignore = WriteOutputAsync(consumer, pipe.Reader, frame);
var ignore = WriteOutputAsync(consumer, pair.Application.Input, frame);
return frame.Output;
}

View File

@ -53,10 +53,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
[MemberData(nameof(ConnectionAdapterData))]
public async Task ConnectionCanReadAndWrite(ListenOptions listenOptions)
{
var serviceContext = new TestServiceContext();
listenOptions.UseHttpServer(listenOptions.ConnectionAdapters, serviceContext, new DummyApplication(TestApp.EchoApp));
var transportContext = new TestLibuvTransportContext()
{
ConnectionHandler = new ConnectionHandler<HttpContext>(listenOptions, new TestServiceContext(), new DummyApplication(TestApp.EchoApp))
ConnectionHandler = new ConnectionHandler(serviceContext, listenOptions.Build())
};
var transport = new LibuvTransport(transportContext, listenOptions);
await transport.BindAsync();

View File

@ -2,11 +2,13 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
@ -25,17 +27,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
public async Task ConnectionsGetRoundRobinedToSecondaryListeners()
{
var libuv = new LibuvFunctions();
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
var serviceContextPrimary = new TestServiceContext();
var transportContextPrimary = new TestLibuvTransportContext();
transportContextPrimary.ConnectionHandler = new ConnectionHandler<HttpContext>(
listenOptions, serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")));
var builderPrimary = new ConnectionBuilder();
builderPrimary.UseHttpServer(serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")));
transportContextPrimary.ConnectionHandler = new ConnectionHandler(serviceContextPrimary, builderPrimary.Build());
var serviceContextSecondary = new TestServiceContext();
var builderSecondary = new ConnectionBuilder();
builderSecondary.UseHttpServer(serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")));
var transportContextSecondary = new TestLibuvTransportContext();
transportContextSecondary.ConnectionHandler = new ConnectionHandler<HttpContext>(
listenOptions, serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")));
transportContextSecondary.ConnectionHandler = new ConnectionHandler(serviceContextSecondary, builderSecondary.Build());
var libuvTransport = new LibuvTransport(libuv, transportContextPrimary, listenOptions);
@ -92,13 +97,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
{
var libuv = new LibuvFunctions();
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
var logger = new TestApplicationErrorLogger();
var serviceContextPrimary = new TestServiceContext();
var builderPrimary = new ConnectionBuilder();
builderPrimary.UseHttpServer(serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")));
var transportContextPrimary = new TestLibuvTransportContext() { Log = new LibuvTrace(logger) };
transportContextPrimary.ConnectionHandler = new ConnectionHandler<HttpContext>(
listenOptions, serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")));
transportContextPrimary.ConnectionHandler = new ConnectionHandler(serviceContextPrimary, builderPrimary.Build());
var serviceContextSecondary = new TestServiceContext
{
@ -107,9 +112,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
ThreadPool = serviceContextPrimary.ThreadPool,
HttpParserFactory = serviceContextPrimary.HttpParserFactory,
};
var builderSecondary = new ConnectionBuilder();
builderSecondary.UseHttpServer(serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")));
var transportContextSecondary = new TestLibuvTransportContext();
transportContextSecondary.ConnectionHandler = new ConnectionHandler<HttpContext>(
listenOptions, serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")));
transportContextSecondary.ConnectionHandler = new ConnectionHandler(serviceContextSecondary, builderSecondary.Build());
var libuvTransport = new LibuvTransport(libuv, transportContextPrimary, listenOptions);
@ -205,9 +211,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
var logger = new TestApplicationErrorLogger();
var serviceContextPrimary = new TestServiceContext();
var builderPrimary = new ConnectionBuilder();
builderPrimary.UseHttpServer(serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")));
var transportContextPrimary = new TestLibuvTransportContext() { Log = new LibuvTrace(logger) };
transportContextPrimary.ConnectionHandler = new ConnectionHandler<HttpContext>(
listenOptions, serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")));
transportContextPrimary.ConnectionHandler = new ConnectionHandler(serviceContextPrimary, builderPrimary.Build());
var serviceContextSecondary = new TestServiceContext
{
@ -216,9 +223,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
ThreadPool = serviceContextPrimary.ThreadPool,
HttpParserFactory = serviceContextPrimary.HttpParserFactory,
};
var builderSecondary = new ConnectionBuilder();
builderSecondary.UseHttpServer(serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")));
var transportContextSecondary = new TestLibuvTransportContext();
transportContextSecondary.ConnectionHandler = new ConnectionHandler<HttpContext>(
listenOptions, serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")));
transportContextSecondary.ConnectionHandler = new ConnectionHandler(serviceContextSecondary, builderSecondary.Build());
var libuvTransport = new LibuvTransport(libuv, transportContextPrimary, listenOptions);
@ -300,5 +308,34 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
return new Uri($"{scheme}://{options.IPEndPoint}");
}
private class ConnectionBuilder : IConnectionBuilder
{
private readonly List<Func<ConnectionDelegate, ConnectionDelegate>> _components = new List<Func<ConnectionDelegate, ConnectionDelegate>>();
public IServiceProvider ApplicationServices { get; set; }
public IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware)
{
_components.Add(middleware);
return this;
}
public ConnectionDelegate Build()
{
ConnectionDelegate app = context =>
{
return Task.CompletedTask;
};
for (int i = _components.Count - 1; i >= 0; i--)
{
var component = _components[i];
app = component(app);
}
return app;
}
}
}
}

View File

@ -22,29 +22,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers
Input = connectionContext.PipeFactory.Create(InputOptions ?? new PipeOptions());
Output = connectionContext.PipeFactory.Create(OutputOptions ?? new PipeOptions());
var context = new TestConnectionContext
{
Connection = new PipeConnection(Output.Reader, Input.Writer)
};
var feature = connectionContext.Features.Get<IConnectionTransportFeature>();
connectionContext.Features.Set<IConnectionApplicationFeature>(context);
connectionContext.Transport = new PipeConnection(Input.Reader, Output.Writer);
feature.Application = new PipeConnection(Output.Reader, Input.Writer);
}
public IPipe Input { get; private set; }
public IPipe Output { get; private set; }
private class TestConnectionContext : IConnectionApplicationFeature
{
public string ConnectionId { get; }
public IPipeConnection Connection { get; set; }
public void Abort(Exception ex)
{
}
public void OnConnectionClosed(Exception ex)
{
}
}
}
}