Remove connection adapters and move things to middleware (#11412)

* Remove connection adapters and move things to middleware
- Remove connection adapters from the public API surface (pubternal) and replace the existing adapters with connection middleware.
- Updated the tests
This commit is contained in:
David Fowler 2019-06-21 00:26:09 +02:00 committed by GitHub
parent c5d62b4a29
commit 25d568885b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 356 additions and 878 deletions

View File

@ -143,7 +143,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
{
internal ListenOptions() { }
public System.IServiceProvider ApplicationServices { get { throw null; } }
public System.Collections.Generic.List<Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal.IConnectionAdapter> ConnectionAdapters { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public ulong FileHandle { get { throw null; } }
public System.Net.IPEndPoint IPEndPoint { get { throw null; } }
public Microsoft.AspNetCore.Server.Kestrel.Core.KestrelServerOptions KestrelServerOptions { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
@ -160,24 +159,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
public System.TimeSpan GracePeriod { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
}
}
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public partial class ConnectionAdapterContext
{
internal ConnectionAdapterContext() { }
public System.IO.Stream ConnectionStream { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public Microsoft.AspNetCore.Http.Features.IFeatureCollection Features { get { throw null; } }
}
public partial interface IAdaptedConnection : System.IDisposable
{
System.IO.Stream ConnectionStream { get; }
}
public partial interface IConnectionAdapter
{
bool IsHttps { get; }
System.Threading.Tasks.Task<Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal.IAdaptedConnection> OnConnectionAsync(Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal.ConnectionAdapterContext context);
}
}
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Features
{
public partial interface IConnectionTimeoutFeature

View File

@ -1,173 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
internal class AdaptedPipeline : IDuplexPipe
{
private readonly int _minAllocBufferSize;
private Task _inputTask;
private Task _outputTask;
public AdaptedPipeline(IDuplexPipe transport,
Pipe inputPipe,
Pipe outputPipe,
ILogger log,
int minAllocBufferSize)
{
TransportStream = new RawStream(transport.Input, transport.Output, throwOnCancelled: true);
Input = inputPipe;
Output = outputPipe;
Log = log;
_minAllocBufferSize = minAllocBufferSize;
}
public RawStream TransportStream { get; }
public Pipe Input { get; }
public Pipe Output { get; }
public ILogger Log { get; }
PipeReader IDuplexPipe.Input => Input.Reader;
PipeWriter IDuplexPipe.Output => Output.Writer;
public void RunAsync(Stream stream)
{
_inputTask = ReadInputAsync(stream);
_outputTask = WriteOutputAsync(stream);
}
public async Task CompleteAsync()
{
Output.Writer.Complete();
Input.Reader.Complete();
if (_outputTask == null)
{
return;
}
// Wait for the output task to complete, this ensures that we've copied
// the application data to the underlying stream
await _outputTask;
// Cancel the underlying stream so that the input task yields
TransportStream.CancelPendingRead();
// The input task should yield now that we've cancelled it
await _inputTask;
}
private async Task WriteOutputAsync(Stream stream)
{
try
{
if (stream == null)
{
return;
}
while (true)
{
var result = await Output.Reader.ReadAsync();
var buffer = result.Buffer;
try
{
if (buffer.IsEmpty)
{
if (result.IsCompleted)
{
break;
}
await stream.FlushAsync();
}
else if (buffer.IsSingleSegment)
{
await stream.WriteAsync(buffer.First);
}
else
{
foreach (var memory in buffer)
{
await stream.WriteAsync(memory);
}
}
}
finally
{
Output.Reader.AdvanceTo(buffer.End);
}
}
}
catch (Exception ex)
{
Log.LogError(0, ex, $"{nameof(AdaptedPipeline)}.{nameof(WriteOutputAsync)}");
}
finally
{
Output.Reader.Complete();
}
}
private async Task ReadInputAsync(Stream stream)
{
Exception error = null;
try
{
if (stream == null)
{
// REVIEW: Do we need an exception here?
return;
}
while (true)
{
var outputBuffer = Input.Writer.GetMemory(_minAllocBufferSize);
var bytesRead = await stream.ReadAsync(outputBuffer);
Input.Writer.Advance(bytesRead);
if (bytesRead == 0)
{
// FIN
break;
}
var result = await Input.Writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
}
catch (OperationCanceledException ex)
{
// Propagate the exception if it's ConnectionAbortedException
error = ex as ConnectionAbortedException;
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
error = ex;
}
finally
{
Input.Writer.Complete(error);
}
}
}
}

View File

@ -1,26 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
// Even though this only includes the non-adapted ConnectionStream currently, this is a context in case
// we want to add more connection metadata later.
public class ConnectionAdapterContext
{
internal ConnectionAdapterContext(ConnectionContext connectionContext, Stream connectionStream)
{
ConnectionContext = connectionContext;
ConnectionStream = connectionStream;
}
internal ConnectionContext ConnectionContext { get; }
public IFeatureCollection Features => ConnectionContext.Features;
public Stream ConnectionStream { get; }
}
}

View File

@ -1,13 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public interface IAdaptedConnection : IDisposable
{
Stream ConnectionStream { get; }
}
}

View File

@ -1,13 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public interface IConnectionAdapter
{
bool IsHttps { get; }
Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context);
}
}

View File

@ -1,47 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
internal class LoggingConnectionAdapter : IConnectionAdapter
{
private readonly ILogger _logger;
public LoggingConnectionAdapter(ILogger logger)
{
if (logger == null)
{
throw new ArgumentNullException(nameof(logger));
}
_logger = logger;
}
public bool IsHttps => false;
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
{
return Task.FromResult<IAdaptedConnection>(
new LoggingAdaptedConnection(context.ConnectionStream, _logger));
}
private class LoggingAdaptedConnection : IAdaptedConnection
{
public LoggingAdaptedConnection(Stream rawStream, ILogger logger)
{
ConnectionStream = new LoggingStream(rawStream, logger);
}
public Stream ConnectionStream { get; }
public void Dispose()
{
}
}
}
}

View File

@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
@ -97,11 +96,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https
_handshakeTimeout = value != Timeout.InfiniteTimeSpan ? value : TimeSpan.MaxValue;
}
}
internal PipeScheduler Scheduler { get; set; } = PipeScheduler.ThreadPool;
internal long? MaxInputBufferSize { get; set; }
internal long? MaxOutputBufferSize { get; set; }
}
}

View File

@ -171,8 +171,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
var httpsDefault = ParseAddress(Constants.DefaultServerHttpsAddress, out https);
context.ServerOptions.ApplyEndpointDefaults(httpsDefault);
if (httpsDefault.ConnectionAdapters.Any(f => f.IsHttps)
|| httpsDefault.TryUseHttps())
if (httpsDefault.IsTls || httpsDefault.TryUseHttps())
{
await httpsDefault.BindAsync(context).ConfigureAwait(false);
context.Logger.LogDebug(CoreStrings.BindingToDefaultAddresses,
@ -255,7 +254,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
var options = ParseAddress(address, out var https);
context.ServerOptions.ApplyEndpointDefaults(options);
if (https && !options.ConnectionAdapters.Any(f => f.IsHttps))
if (https && !options.IsTls)
{
options.UseHttps();
}

View File

@ -86,7 +86,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
catch (Exception ex)
{
Log.LogCritical(0, ex, $"{nameof(ConnectionDispatcher)}.{nameof(Execute)}() {connectionContext.ConnectionId}");
Log.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
}
}
}

View File

@ -2,10 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Threading.Tasks;
@ -13,7 +10,6 @@ using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2;
@ -30,9 +26,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
private readonly ISystemClock _systemClock;
private readonly TimeoutControl _timeoutControl;
private IList<IAdaptedConnection> _adaptedConnections;
private IDuplexPipe _adaptedTransport;
private readonly object _protocolSelectionLock = new object();
private ProtocolSelectionState _protocolSelectionState = ProtocolSelectionState.Initializing;
private IRequestProcessor _requestProcessor;
@ -50,54 +43,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public IPEndPoint LocalEndPoint => _context.LocalEndPoint;
public IPEndPoint RemoteEndPoint => _context.RemoteEndPoint;
private MemoryPool<byte> MemoryPool => _context.MemoryPool;
// Internal for testing
internal PipeOptions AdaptedInputPipeOptions => new PipeOptions
(
pool: MemoryPool,
readerScheduler: _context.ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
resumeWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
useSynchronizationContext: false,
minimumSegmentSize: MemoryPool.GetMinimumSegmentSize()
);
internal PipeOptions AdaptedOutputPipeOptions => new PipeOptions
(
pool: MemoryPool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxResponseBufferSize ?? 0,
resumeWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxResponseBufferSize ?? 0,
useSynchronizationContext: false,
minimumSegmentSize: MemoryPool.GetMinimumSegmentSize()
);
private IKestrelTrace Log => _context.ServiceContext.Log;
public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> httpApplication)
{
try
{
AdaptedPipeline adaptedPipeline = null;
// _adaptedTransport must be set prior to wiring up callbacks
// to allow the connection to be aborted prior to protocol selection.
_adaptedTransport = _context.Transport;
if (_context.ConnectionAdapters.Count > 0)
{
adaptedPipeline = new AdaptedPipeline(_adaptedTransport,
new Pipe(AdaptedInputPipeOptions),
new Pipe(AdaptedOutputPipeOptions),
Log,
MemoryPool.GetMinimumAllocSize());
_adaptedTransport = adaptedPipeline;
}
// This feature should never be null in Kestrel
var connectionHeartbeatFeature = _context.ConnectionFeatures.Get<IConnectionHeartbeatFeature>();
@ -116,13 +67,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_context.ConnectionFeatures.Set<IConnectionTimeoutFeature>(_timeoutControl);
if (adaptedPipeline != null)
{
// Stream can be null here and run async will close the connection in that case
var stream = await ApplyConnectionAdaptersAsync(adaptedPipeline.TransportStream);
adaptedPipeline.RunAsync(stream);
}
IRequestProcessor requestProcessor = null;
lock (_protocolSelectionLock)
@ -130,7 +74,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
// Ensure that the connection hasn't already been stopped.
if (_protocolSelectionState == ProtocolSelectionState.Initializing)
{
var derivedContext = CreateDerivedContext(_adaptedTransport);
var derivedContext = CreateDerivedContext(_context.Transport);
switch (SelectProtocol())
{
@ -169,9 +113,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
await requestProcessor.ProcessRequestsAsync(httpApplication);
}
}
// Complete the pipeline after the method runs
await (adaptedPipeline?.CompleteAsync() ?? Task.CompletedTask);
}
}
catch (Exception ex)
@ -180,8 +121,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
finally
{
DisposeAdaptedConnections();
if (_http1Connection?.IsUpgraded == true)
{
_context.ServiceContext.ConnectionManager.UpgradedConnectionCount.ReleaseOne();
@ -234,7 +173,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
switch (previousState)
{
case ProtocolSelectionState.Initializing:
CloseUninitializedConnection(new ConnectionAbortedException(CoreStrings.ServerShutdownDuringConnectionInitialization));
_context.ConnectionContext.Abort(new ConnectionAbortedException(CoreStrings.ServerShutdownDuringConnectionInitialization));
break;
case ProtocolSelectionState.Selected:
_requestProcessor.StopProcessingNextRequest();
@ -268,7 +207,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
// ConnectionClosed callback is not wired up until after leaving the Initializing state.
Debug.Assert(false);
CloseUninitializedConnection(new ConnectionAbortedException("HttpConnection.OnInputOrOutputCompleted() called while in the ProtocolSelectionState.Initializing state!?"));
_context.ConnectionContext.Abort(new ConnectionAbortedException("HttpConnection.OnInputOrOutputCompleted() called while in the ProtocolSelectionState.Initializing state!?"));
break;
case ProtocolSelectionState.Selected:
_requestProcessor.OnInputOrOutputCompleted();
@ -291,7 +230,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
switch (previousState)
{
case ProtocolSelectionState.Initializing:
CloseUninitializedConnection(ex);
_context.ConnectionContext.Abort(ex);
break;
case ProtocolSelectionState.Selected:
_requestProcessor.Abort(ex);
@ -301,43 +240,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
}
}
private async Task<Stream> ApplyConnectionAdaptersAsync(RawStream stream)
{
var connectionAdapters = _context.ConnectionAdapters;
var adapterContext = new ConnectionAdapterContext(_context.ConnectionContext, stream);
_adaptedConnections = new List<IAdaptedConnection>(connectionAdapters.Count);
try
{
for (var i = 0; i < connectionAdapters.Count; i++)
{
var adaptedConnection = await connectionAdapters[i].OnConnectionAsync(adapterContext);
_adaptedConnections.Add(adaptedConnection);
adapterContext = new ConnectionAdapterContext(_context.ConnectionContext, adaptedConnection.ConnectionStream);
}
}
catch (Exception ex)
{
Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}.");
return null;
}
return adapterContext.ConnectionStream;
}
private void DisposeAdaptedConnections()
{
var adaptedConnections = _adaptedConnections;
if (adaptedConnections != null)
{
for (var i = adaptedConnections.Count - 1; i >= 0; i--)
{
adaptedConnections[i].Dispose();
}
}
}
private HttpProtocols SelectProtocol()
{
var hasTls = _context.ConnectionFeatures.Get<ITlsConnectionFeature>() != null;
@ -388,17 +290,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
_requestProcessor?.Tick(now);
}
private void CloseUninitializedConnection(ConnectionAbortedException abortReason)
{
_context.ConnectionContext.Abort(abortReason);
if (_context.ConnectionAdapters.Count > 0)
{
_adaptedTransport.Input.Complete();
_adaptedTransport.Output.Complete();
}
}
public void OnTimeout(TimeoutReason reason)
{
// In the cases that don't log directly here, we expect the setter of the timeout to also be the input

View File

@ -5,7 +5,6 @@ using System;
using System.Collections.Generic;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
@ -13,12 +12,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public static IConnectionBuilder UseHttpServer<TContext>(this IConnectionBuilder builder, ServiceContext serviceContext, IHttpApplication<TContext> application, HttpProtocols protocols)
{
return builder.UseHttpServer(Array.Empty<IConnectionAdapter>(), serviceContext, application, protocols);
}
public static IConnectionBuilder UseHttpServer<TContext>(this IConnectionBuilder builder, IList<IConnectionAdapter> adapters, ServiceContext serviceContext, IHttpApplication<TContext> application, HttpProtocols protocols)
{
var middleware = new HttpConnectionMiddleware<TContext>(adapters, serviceContext, application, protocols);
var middleware = new HttpConnectionMiddleware<TContext>(serviceContext, application, protocols);
return builder.Use(next =>
{
return middleware.OnConnectionAsync;

View File

@ -7,7 +7,6 @@ using System.IO.Pipelines;
using System.Net;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
@ -19,7 +18,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
public ConnectionContext ConnectionContext { get; set; }
public ServiceContext ServiceContext { get; set; }
public IFeatureCollection ConnectionFeatures { get; set; }
public IList<IConnectionAdapter> ConnectionAdapters { get; set; }
public MemoryPool<byte> MemoryPool { get; set; }
public IPEndPoint LocalEndPoint { get; set; }
public IPEndPoint RemoteEndPoint { get; set; }

View File

@ -1,38 +1,29 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
internal class HttpConnectionMiddleware<TContext>
{
private readonly IList<IConnectionAdapter> _connectionAdapters;
private readonly ServiceContext _serviceContext;
private readonly IHttpApplication<TContext> _application;
private readonly HttpProtocols _protocols;
public HttpConnectionMiddleware(IList<IConnectionAdapter> adapters, ServiceContext serviceContext, IHttpApplication<TContext> application, HttpProtocols protocols)
public HttpConnectionMiddleware(ServiceContext serviceContext, IHttpApplication<TContext> application, HttpProtocols protocols)
{
_serviceContext = serviceContext;
_application = application;
_protocols = protocols;
// Keeping these around for now so progress can be made without updating tests
_connectionAdapters = adapters;
}
public Task OnConnectionAsync(ConnectionContext connectionContext)
{
// We need the transport feature so that we can cancel the output reader that the transport is using
// This is a bit of a hack but it preserves the existing semantics
var memoryPoolFeature = connectionContext.Features.Get<IMemoryPoolFeature>();
var httpConnectionContext = new HttpConnectionContext
@ -43,7 +34,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
ServiceContext = _serviceContext,
ConnectionFeatures = connectionContext.Features,
MemoryPool = memoryPoolFeature.MemoryPool,
ConnectionAdapters = _connectionAdapters,
Transport = connectionContext.Transport
};

View File

@ -15,7 +15,6 @@ using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.Extensions.Logging;
@ -26,7 +25,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
internal class HttpsConnectionMiddleware
{
private readonly ConnectionDelegate _next;
private readonly HttpsConnectionAdapterOptions _options;
private readonly ILogger _logger;
private readonly X509Certificate2 _serverCertificate;
@ -80,49 +78,37 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
private async Task InnerOnConnectionAsync(ConnectionContext context)
{
SslStream sslStream;
bool certificateRequired;
var feature = new Core.Internal.TlsConnectionFeature();
context.Features.Set<ITlsConnectionFeature>(feature);
context.Features.Set<ITlsHandshakeFeature>(feature);
// TODO: Handle the cases where this can be null
var memoryPoolFeature = context.Features.Get<IMemoryPoolFeature>();
var memoryPool = context.Features.Get<IMemoryPoolFeature>()?.MemoryPool;
var inputPipeOptions = new PipeOptions
var inputPipeOptions = new StreamPipeReaderOptions
(
pool: memoryPoolFeature.MemoryPool,
readerScheduler: _options.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: _options.MaxInputBufferSize ?? 0,
resumeWriterThreshold: _options.MaxInputBufferSize / 2 ?? 0,
useSynchronizationContext: false,
minimumSegmentSize: memoryPoolFeature.MemoryPool.GetMinimumSegmentSize()
pool: memoryPool,
bufferSize: memoryPool.GetMinimumSegmentSize(),
minimumReadSize: memoryPool.GetMinimumAllocSize(),
leaveOpen: true
);
var outputPipeOptions = new PipeOptions
var outputPipeOptions = new StreamPipeWriterOptions
(
pool: memoryPoolFeature.MemoryPool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: _options.MaxOutputBufferSize ?? 0,
resumeWriterThreshold: _options.MaxOutputBufferSize / 2 ?? 0,
useSynchronizationContext: false,
minimumSegmentSize: memoryPoolFeature.MemoryPool.GetMinimumSegmentSize()
pool: memoryPool,
leaveOpen: true
);
// TODO: eventually make SslDuplexStream : Stream, IDuplexPipe to avoid RawStream allocation and pipe allocations
var adaptedPipeline = new AdaptedPipeline(context.Transport, new Pipe(inputPipeOptions), new Pipe(outputPipeOptions), _logger, memoryPoolFeature.MemoryPool.GetMinimumAllocSize());
var transportStream = adaptedPipeline.TransportStream;
SslDuplexPipe sslDuplexPipe = null;
if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate)
{
sslStream = new SslStream(transportStream);
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions);
certificateRequired = false;
}
else
{
sslStream = new SslStream(transportStream,
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions, s => new SslStream(s,
leaveInnerStreamOpen: false,
userCertificateValidationCallback: (sender, certificate, chain, sslPolicyErrors) =>
{
@ -154,11 +140,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
}
return true;
});
}));
certificateRequired = true;
}
var sslStream = sslDuplexPipe.Stream;
using (var cancellationTokeSource = new CancellationTokenSource(_options.HandshakeTimeout))
using (cancellationTokeSource.Token.UnsafeRegister(state => ((ConnectionContext)state).Abort(), context))
{
@ -210,13 +198,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
catch (OperationCanceledException)
{
_logger?.LogDebug(2, CoreStrings.AuthenticationTimedOut);
sslStream.Dispose();
await sslStream.DisposeAsync();
return;
}
catch (Exception ex) when (ex is IOException || ex is AuthenticationException)
{
_logger?.LogDebug(1, ex, CoreStrings.AuthenticationFailed);
sslStream.Dispose();
await sslStream.DisposeAsync();
return;
}
}
@ -232,30 +220,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
feature.KeyExchangeStrength = sslStream.KeyExchangeStrength;
feature.Protocol = sslStream.SslProtocol;
var original = context.Transport;
var originalTransport = context.Transport;
try
{
context.Transport = adaptedPipeline;
context.Transport = sslDuplexPipe;
using (sslStream)
// Disposing the stream will dispose the sslDuplexPipe
await using (sslStream)
{
try
{
adaptedPipeline.RunAsync(sslStream);
await _next(context);
}
finally
{
await adaptedPipeline.CompleteAsync();
}
await _next(context);
}
}
finally
{
// Restore the original so that it gets closed appropriately
context.Transport = original;
context.Transport = originalTransport;
}
}
@ -281,5 +261,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal
return new X509Certificate2(certificate);
}
private class SslDuplexPipe : DuplexPipeStreamAdapter<SslStream>
{
public SslDuplexPipe(IDuplexPipe transport, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions)
: this(transport, readerOptions, writerOptions, s => new SslStream(s))
{
}
public SslDuplexPipe(IDuplexPipe transport, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, SslStream> factory) :
base(transport, readerOptions, writerOptions, factory)
{
}
}
}
}

View File

@ -124,7 +124,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
async Task OnBind(ListenOptions options)
{
// Add the HTTP middleware as the terminal connection middleware
options.UseHttpServer(options.ConnectionAdapters, ServiceContext, application, options.Protocols);
options.UseHttpServer(ServiceContext, application, options.Protocols);
var connectionDelegate = options.Build();

View File

@ -3,12 +3,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.Core
@ -63,7 +61,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
public ulong FileHandle => (EndPoint as FileHandleEndPoint)?.FileHandle ?? 0;
/// <summary>
/// Enables an <see cref="IConnectionAdapter"/> to resolve and use services registered by the application during startup.
/// Enables connection middleware to resolve and use services registered by the application during startup.
/// Only set if accessed from the callback of a <see cref="KestrelServerOptions"/> Listen* method.
/// </summary>
public KestrelServerOptions KestrelServerOptions { get; internal set; }
@ -74,19 +72,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
/// <remarks>Defaults to HTTP/1.x and HTTP/2.</remarks>
public HttpProtocols Protocols { get; set; } = HttpProtocols.Http1AndHttp2;
/// <summary>
/// Gets the <see cref="List{IConnectionAdapter}"/> that allows each connection <see cref="System.IO.Stream"/>
/// to be intercepted and transformed.
/// Configured by the <c>UseHttps()</c> and <see cref="Hosting.ListenOptionsConnectionLoggingExtensions.UseConnectionLogging(ListenOptions)"/>
/// extension methods.
/// </summary>
/// <remarks>
/// Defaults to empty.
/// </remarks>
#pragma warning disable PUB0001 // Pubternal type in public API
public List<IConnectionAdapter> ConnectionAdapters { get; } = new List<IConnectionAdapter>();
#pragma warning restore PUB0001 // Pubternal type in public API
public IServiceProvider ApplicationServices => KestrelServerOptions?.ApplicationServices;
internal string Scheme
@ -125,6 +110,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
public override string ToString() => GetDisplayName();
/// <summary>
/// Adds a middleware delegate to the connection pipeline.
/// Configured by the <c>UseHttps()</c> and <see cref="Hosting.ListenOptionsConnectionLoggingExtensions.UseConnectionLogging(ListenOptions)"/>
/// extension methods.
/// </summary>
/// <param name="middleware">The middleware delegate.</param>
/// <returns>The <see cref="IConnectionBuilder"/>.</returns>
public IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware)
{
_middleware.Add(middleware);

View File

@ -26,7 +26,7 @@ namespace Microsoft.AspNetCore.Hosting
/// <param name="listenOptions">The <see cref="ListenOptions"/> to configure.</param>
/// <returns>The <see cref="ListenOptions"/>.</returns>
public static ListenOptions UseHttps(this ListenOptions listenOptions) => listenOptions.UseHttps(_ => { });
/// <summary>
/// Configure Kestrel to use HTTPS.
/// </summary>
@ -218,9 +218,6 @@ namespace Microsoft.AspNetCore.Hosting
// Set the list of protocols from listen options
httpsOptions.HttpProtocols = listenOptions.Protocols;
httpsOptions.MaxInputBufferSize = listenOptions.KestrelServerOptions?.Limits.MaxRequestBufferSize;
httpsOptions.MaxOutputBufferSize = listenOptions.KestrelServerOptions?.Limits.MaxResponseBufferSize;
listenOptions.IsTls = true;
listenOptions.Use(next =>

View File

@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
@ -78,7 +77,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
};
options._middleware.AddRange(_middleware);
options.ConnectionAdapters.AddRange(ConnectionAdapters);
return options;
}
}

View File

@ -8,16 +8,16 @@ using System.Threading;
using System.Threading.Tasks;
using System.Buffers;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
internal sealed class RawStream : Stream
internal class DuplexPipeStream : Stream
{
private readonly PipeReader _input;
private readonly PipeWriter _output;
private readonly bool _throwOnCancelled;
private volatile bool _cancelCalled;
public RawStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false)
public DuplexPipeStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false)
{
_input = input;
_output = output;

View File

@ -0,0 +1,49 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
/// <summary>
/// A helper for wrapping a Stream decorator from an <see cref="IDuplexPipe"/>.
/// </summary>
/// <typeparam name="TStream"></typeparam>
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
{
}
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) : base(duplexPipe.Input, duplexPipe.Output)
{
Stream = createStream(this);
Input = PipeReader.Create(Stream, readerOptions);
Output = PipeWriter.Create(Stream, writerOptions);
}
public TStream Stream { get; }
public PipeReader Input { get; }
public PipeWriter Output { get; }
protected override void Dispose(bool disposing)
{
Input.Complete();
Output.Complete();
base.Dispose(disposing);
}
public override ValueTask DisposeAsync()
{
Input.Complete();
Output.Complete();
return base.DisposeAsync();
}
}
}

View File

@ -0,0 +1,50 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
internal class LoggingConnectionMiddleware
{
private readonly ConnectionDelegate _next;
private readonly ILogger _logger;
public LoggingConnectionMiddleware(ConnectionDelegate next, ILogger logger)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task OnConnectionAsync(ConnectionContext context)
{
var oldTransport = context.Transport;
try
{
await using (var loggingDuplexPipe = new LoggingDuplexPipe(context.Transport, _logger))
{
context.Transport = loggingDuplexPipe;
await _next(context);
}
}
finally
{
context.Transport = oldTransport;
}
}
private class LoggingDuplexPipe : DuplexPipeStreamAdapter<LoggingStream>
{
public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) :
base(transport, stream => new LoggingStream(stream, logger))
{
}
}
}
}

View File

@ -8,7 +8,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
internal sealed class LoggingStream : Stream
{

View File

@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@ -30,8 +30,8 @@ namespace Microsoft.AspNetCore.Hosting
public static ListenOptions UseConnectionLogging(this ListenOptions listenOptions, string loggerName)
{
var loggerFactory = listenOptions.KestrelServerOptions.ApplicationServices.GetRequiredService<ILoggerFactory>();
var logger = loggerName == null ? loggerFactory.CreateLogger<LoggingConnectionAdapter>() : loggerFactory.CreateLogger(loggerName);
listenOptions.ConnectionAdapters.Add(new LoggingConnectionAdapter(logger));
var logger = loggerName == null ? loggerFactory.CreateLogger<LoggingConnectionMiddleware>() : loggerFactory.CreateLogger(loggerName);
listenOptions.Use(next => new LoggingConnectionMiddleware(next, logger).OnConnectionAsync);
return listenOptions;
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Net;
@ -22,7 +22,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public void LocalHostListenOptionsClonesConnectionMiddleware()
{
var localhostListenOptions = new LocalhostListenOptions(1004);
localhostListenOptions.ConnectionAdapters.Add(new PassThroughConnectionAdapter());
var serviceProvider = new ServiceCollection().BuildServiceProvider();
localhostListenOptions.KestrelServerOptions = new KestrelServerOptions
{
@ -45,7 +44,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
Assert.NotNull(clone.KestrelServerOptions);
Assert.NotNull(serviceProvider);
Assert.Same(serviceProvider, clone.ApplicationServices);
Assert.Single(clone.ConnectionAdapters);
}
}
}

View File

@ -1,52 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Testing;
using Moq;
using Xunit;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
{
public class PipeOptionsTests
{
[Theory]
[InlineData(10, 10, 10)]
[InlineData(null, 0, 0)]
public void AdaptedInputPipeOptionsConfiguredCorrectly(long? maxRequestBufferSize, long expectedMaximumSizeLow, long expectedMaximumSizeHigh)
{
var serviceContext = new TestServiceContext();
serviceContext.ServerOptions.Limits.MaxRequestBufferSize = maxRequestBufferSize;
var connectionLifetime = new HttpConnection(new HttpConnectionContext
{
ServiceContext = serviceContext
});
Assert.Equal(expectedMaximumSizeLow, connectionLifetime.AdaptedInputPipeOptions.ResumeWriterThreshold);
Assert.Equal(expectedMaximumSizeHigh, connectionLifetime.AdaptedInputPipeOptions.PauseWriterThreshold);
Assert.Same(serviceContext.Scheduler, connectionLifetime.AdaptedInputPipeOptions.ReaderScheduler);
Assert.Same(PipeScheduler.Inline, connectionLifetime.AdaptedInputPipeOptions.WriterScheduler);
}
[Theory]
[InlineData(10, 10, 10)]
[InlineData(null, 0, 0)]
public void AdaptedOutputPipeOptionsConfiguredCorrectly(long? maxRequestBufferSize, long expectedMaximumSizeLow, long expectedMaximumSizeHigh)
{
var serviceContext = new TestServiceContext();
serviceContext.ServerOptions.Limits.MaxResponseBufferSize = maxRequestBufferSize;
var connectionLifetime = new HttpConnection(new HttpConnectionContext
{
ServiceContext = serviceContext
});
Assert.Equal(expectedMaximumSizeLow, connectionLifetime.AdaptedOutputPipeOptions.ResumeWriterThreshold);
Assert.Equal(expectedMaximumSizeHigh, connectionLifetime.AdaptedOutputPipeOptions.PauseWriterThreshold);
Assert.Same(PipeScheduler.Inline, connectionLifetime.AdaptedOutputPipeOptions.ReaderScheduler);
Assert.Same(PipeScheduler.Inline, connectionLifetime.AdaptedOutputPipeOptions.WriterScheduler);
}
}
}

View File

@ -185,7 +185,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
return context.Response.WriteAsync("Hello World");
});
listenOptions.UseHttpServer(listenOptions.ConnectionAdapters, serviceContext, testApplication, HttpProtocols.Http1);
listenOptions.UseHttpServer(serviceContext, testApplication, HttpProtocols.Http1);
var transportContext = new TestLibuvTransportContext
{

View File

@ -2,11 +2,10 @@ using System;
using System.IO;
using System.Net;
using System.Security.Authentication;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
@ -39,7 +38,20 @@ namespace Http2SampleApp
{
listenOptions.Protocols = HttpProtocols.Http1AndHttp2;
listenOptions.UseHttps();
listenOptions.ConnectionAdapters.Add(new TlsFilterAdapter());
listenOptions.Use((context, next) =>
{
// https://tools.ietf.org/html/rfc7540#appendix-A
// Allows filtering TLS handshakes on a per connection basis
var tlsFeature = context.Features.Get<ITlsHandshakeFeature>();
if (tlsFeature.CipherAlgorithm == CipherAlgorithmType.Null)
{
throw new NotSupportedException("Prohibited cipher: " + tlsFeature.CipherAlgorithm);
}
return next();
});
});
// Prior knowledge, no TLS handshake. WARNING: Not supported by browsers
@ -54,38 +66,5 @@ namespace Http2SampleApp
hostBuilder.Build().Run();
}
// https://tools.ietf.org/html/rfc7540#appendix-A
// Allows filtering TLS handshakes on a per connection basis
private class TlsFilterAdapter : IConnectionAdapter
{
public bool IsHttps => false;
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
{
var tlsFeature = context.Features.Get<ITlsHandshakeFeature>();
if (tlsFeature.CipherAlgorithm == CipherAlgorithmType.Null)
{
throw new NotSupportedException("Prohibited cipher: " + tlsFeature.CipherAlgorithm);
}
return Task.FromResult<IAdaptedConnection>(new AdaptedConnection(context.ConnectionStream));
}
private class AdaptedConnection : IAdaptedConnection
{
public AdaptedConnection(Stream adaptedStream)
{
ConnectionStream = adaptedStream;
}
public Stream ConnectionStream { get; }
public void Dispose()
{
}
}
}
}
}

View File

@ -2,168 +2,96 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Connections;
namespace Microsoft.AspNetCore.Testing
{
public class PassThroughConnectionAdapter : IConnectionAdapter
public class PassThroughConnectionMiddleware
{
public bool IsHttps => false;
private readonly ConnectionDelegate _next;
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
public PassThroughConnectionMiddleware(ConnectionDelegate next)
{
var adapted = new AdaptedConnection(new PassThroughStream(context.ConnectionStream));
return Task.FromResult<IAdaptedConnection>(adapted);
_next = next;
}
private class AdaptedConnection : IAdaptedConnection
public Task OnConnectionAsync(ConnectionContext context)
{
public AdaptedConnection(Stream stream)
{
ConnectionStream = stream;
}
public Stream ConnectionStream { get; }
public void Dispose()
{
}
context.Transport = new PassThroughDuplexPipe(context.Transport);
return _next(context);
}
private class PassThroughStream : Stream
private class PassThroughDuplexPipe : IDuplexPipe
{
private readonly Stream _innerStream;
public PassThroughStream(Stream innerStream)
public PassThroughDuplexPipe(IDuplexPipe duplexPipe)
{
_innerStream = innerStream;
Input = new PassThroughPipeReader(duplexPipe.Input);
Output = new PassThroughPipeWriter(duplexPipe.Output);
}
public override bool CanRead => _innerStream.CanRead;
public PipeReader Input { get; }
public override bool CanSeek => _innerStream.CanSeek;
public PipeWriter Output { get; }
public override bool CanTimeout => _innerStream.CanTimeout;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;
public override long Position { get => _innerStream.Position; set => _innerStream.Position = value; }
public override int ReadTimeout { get => _innerStream.ReadTimeout; set => _innerStream.ReadTimeout = value; }
public override int WriteTimeout { get => _innerStream.WriteTimeout; set => _innerStream.WriteTimeout = value; }
public override int Read(byte[] buffer, int offset, int count)
private class PassThroughPipeWriter : PipeWriter
{
return _innerStream.Read(buffer, offset, count);
private PipeWriter _output;
public PassThroughPipeWriter(PipeWriter output)
{
_output = output;
}
public override void Advance(int bytes) => _output.Advance(bytes);
public override void CancelPendingFlush() => _output.CancelPendingFlush();
public override void Complete(Exception exception = null) => _output.Complete(exception);
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => _output.FlushAsync(cancellationToken);
public override Memory<byte> GetMemory(int sizeHint = 0) => _output.GetMemory(sizeHint);
public override Span<byte> GetSpan(int sizeHint = 0) => _output.GetSpan(sizeHint);
public override void OnReaderCompleted(Action<Exception, object> callback, object state) => _output.OnReaderCompleted(callback, state);
}
public override int ReadByte()
private class PassThroughPipeReader : PipeReader
{
return _innerStream.ReadByte();
}
private PipeReader _input;
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _innerStream.ReadAsync(buffer, offset, count, cancellationToken);
}
public PassThroughPipeReader(PipeReader input)
{
_input = input;
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _innerStream.BeginRead(buffer, offset, count, callback, state);
}
public override void AdvanceTo(SequencePosition consumed) => _input.AdvanceTo(consumed);
public override int EndRead(IAsyncResult asyncResult)
{
return _innerStream.EndRead(asyncResult);
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _input.AdvanceTo(consumed, examined);
public override void Write(byte[] buffer, int offset, int count)
{
_innerStream.Write(buffer, offset, count);
}
public override void CancelPendingRead() => _input.CancelPendingRead();
public override void Complete(Exception exception = null) => _input.Complete(exception);
public override void WriteByte(byte value)
{
_innerStream.WriteByte(value);
}
public override void OnWriterCompleted(Action<Exception, object> callback, object state) => _input.OnWriterCompleted(callback, state);
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _innerStream.WriteAsync(buffer, offset, count, cancellationToken);
}
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) => _input.ReadAsync(cancellationToken);
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return _innerStream.BeginWrite(buffer, offset, count, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult)
{
_innerStream.EndWrite(asyncResult);
}
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return _innerStream.CopyToAsync(destination, bufferSize, cancellationToken);
}
public override void Flush()
{
_innerStream.Flush();
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return _innerStream.FlushAsync();
}
public override long Seek(long offset, SeekOrigin origin)
{
return _innerStream.Seek(offset, origin);
}
public override void SetLength(long value)
{
_innerStream.SetLength(value);
}
public override void Close()
{
_innerStream.Close();
}
public override int Read(Span<byte> buffer)
{
return _innerStream.Read(buffer);
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return _innerStream.ReadAsync(buffer, cancellationToken);
}
public override void Write(ReadOnlySpan<byte> buffer)
{
_innerStream.Write(buffer);
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
return _innerStream.WriteAsync(buffer, cancellationToken);
}
public override void CopyTo(Stream destination, int bufferSize)
{
_innerStream.CopyTo(destination, bufferSize);
public override bool TryRead(out ReadResult result) => _input.TryRead(out result);
}
}
}
public static class PassThroughConnectionMiddlewareExtensions
{
public static TBuilder UsePassThrough<TBuilder>(this TBuilder builder) where TBuilder : IConnectionBuilder
{
builder.Use(next => new PassThroughConnectionMiddleware(next).OnConnectionAsync);
return builder;
}
}
}

View File

@ -6,7 +6,6 @@ using System.IO;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging.Testing;
using Xunit;
@ -18,12 +17,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
[Fact]
public async Task ThrowingSynchronousConnectionAdapterDoesNotCrashServer()
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new ThrowingConnectionAdapter() }
};
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next => context => throw new Exception());
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions))
{
@ -47,15 +44,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
await server.StopAsync();
}
}
private class ThrowingConnectionAdapter : IConnectionAdapter
{
public bool IsHttps => false;
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
{
throw new Exception();
}
}
}
}

View File

@ -297,7 +297,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
if (useConnectionAdapter)
{
listenOptions.ConnectionAdapters.Add(new PassThroughConnectionAdapter());
listenOptions.UsePassThrough();
}
});

View File

@ -40,10 +40,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
public static TheoryData<ListenOptions> ConnectionAdapterData => new TheoryData<ListenOptions>
{
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)),
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new PassThroughConnectionAdapter() }
}
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)).UsePassThrough()
};
[Theory]
@ -509,7 +506,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
[MemberData(nameof(ConnectionAdapterData))]
public async Task ConnectionClosedTokenFiresOnClientFIN(ListenOptions listenOptions)
{
var testContext = new TestServiceContext(LoggerFactory);
var testContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
var appStartedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var connectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -546,7 +543,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
[MemberData(nameof(ConnectionAdapterData))]
public async Task ConnectionClosedTokenFiresOnServerFIN(ListenOptions listenOptions)
{
var testContext = new TestServiceContext(LoggerFactory);
var testContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
var connectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
using (var server = new TestServer(context =>
@ -583,7 +580,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
[MemberData(nameof(ConnectionAdapterData))]
public async Task ConnectionClosedTokenFiresOnServerAbort(ListenOptions listenOptions)
{
var testContext = new TestServiceContext(LoggerFactory);
var testContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
var connectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
using (var server = new TestServer(context =>
@ -628,7 +625,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
// This needs a timeout.
const int applicationAbortedConnectionId = 34;
var testContext = new TestServiceContext(LoggerFactory);
var testContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
var readTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var registrationTcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
@ -746,6 +743,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
var mockKestrelTrace = new Mock<IKestrelTrace>();
var testContext = new TestServiceContext(LoggerFactory, mockKestrelTrace.Object)
{
ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count,
ServerOptions =
{
Limits =
@ -805,7 +803,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
var appStartedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var mockKestrelTrace = new Mock<IKestrelTrace>();
var testContext = new TestServiceContext(LoggerFactory, mockKestrelTrace.Object);
var testContext = new TestServiceContext(LoggerFactory, mockKestrelTrace.Object) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
var scratchBuffer = new byte[4096];

View File

@ -36,10 +36,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
public static TheoryData<ListenOptions> ConnectionAdapterData => new TheoryData<ListenOptions>
{
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)),
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new PassThroughConnectionAdapter() }
}
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)).UsePassThrough()
};
[Fact]
@ -160,7 +157,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
{
appCompleted.TrySetException(ex);
}
}, new TestServiceContext(LoggerFactory), listenOptions))
}, new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count }, listenOptions))
{
using (var connection = server.CreateConnection())
{
@ -222,7 +219,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
}
writeTcs.SetException(new Exception("This shouldn't be reached."));
}, new TestServiceContext(LoggerFactory), listenOptions))
}, new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count }, listenOptions))
{
using (var connection = server.CreateConnection())
{
@ -276,6 +273,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
var testContext = new TestServiceContext(LoggerFactory, mockKestrelTrace.Object)
{
ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count,
ServerOptions =
{
Limits =
@ -370,7 +368,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
await requestAborted.Task.DefaultTimeout();
appCompletedTcs.SetResult(null);
}, new TestServiceContext(LoggerFactory), listenOptions))
}, new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count }, listenOptions))
{
using (var connection = server.CreateConnection())
{
@ -424,7 +422,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests
// There's not guarantee that the app even gets invoked in this test. The connection reset can be observed
// as early as accept.
using (var server = new TestServer(context => Task.CompletedTask, new TestServiceContext(LoggerFactory), listenOptions))
var testServiceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
using (var server = new TestServer(context => Task.CompletedTask, testServiceContext, listenOptions))
{
for (var i = 0; i < numConnections; i++)
{

View File

@ -7,10 +7,10 @@ using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.TestTransport;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging.Testing;
@ -20,7 +20,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
{
public class ConnectionAdapterTests : TestApplicationErrorLoggerLoggedTest
{
public static TheoryData<RequestDelegate> EchoAppRequestDelegates =>
new TheoryData<RequestDelegate>
{
@ -32,13 +31,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
[MemberData(nameof(EchoAppRequestDelegates))]
public async Task CanReadAndWriteWithRewritingConnectionAdapter(RequestDelegate requestDelegate)
{
var adapter = new RewritingConnectionAdapter();
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { adapter }
};
RewritingConnectionMiddleware middleware = null;
var serviceContext = new TestServiceContext(LoggerFactory);
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next =>
{
middleware = new RewritingConnectionMiddleware(next);
return middleware.OnConnectionAsync;
});
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
var sendString = "POST / HTTP/1.0\r\nContent-Length: 12\r\n\r\nHello World?";
@ -57,19 +59,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
Assert.Equal(sendString.Length, adapter.BytesRead);
Assert.Equal(sendString.Length, middleware.BytesRead);
}
[Theory]
[MemberData(nameof(EchoAppRequestDelegates))]
public async Task CanReadAndWriteWithAsyncConnectionAdapter(RequestDelegate requestDelegate)
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new AsyncConnectionAdapter() }
};
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next => new AsyncConnectionMiddleware(next).OnConnectionAsync);
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(requestDelegate, serviceContext, listenOptions))
{
@ -94,12 +94,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
[MemberData(nameof(EchoAppRequestDelegates))]
public async Task ImmediateFinAfterOnConnectionAsyncClosesGracefully(RequestDelegate requestDelegate)
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new AsyncConnectionAdapter() }
};
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next => new AsyncConnectionMiddleware(next).OnConnectionAsync);
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(requestDelegate, serviceContext, listenOptions))
{
@ -116,12 +114,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
[MemberData(nameof(EchoAppRequestDelegates))]
public async Task ImmediateFinAfterThrowingClosesGracefully(RequestDelegate requestDelegate)
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new ThrowingConnectionAdapter() }
};
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next => context => throw new InvalidOperationException());
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(requestDelegate, serviceContext, listenOptions))
{
@ -139,12 +135,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
[MemberData(nameof(EchoAppRequestDelegates))]
public async Task ImmediateShutdownAfterOnConnectionAsyncDoesNotCrash(RequestDelegate requestDelegate)
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new AsyncConnectionAdapter() }
};
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next => new AsyncConnectionMiddleware(next).OnConnectionAsync);
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
TestApplicationErrorLogger.ThrowOnUngracefulShutdown = false;
@ -167,13 +161,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
[Fact]
public async Task ImmediateShutdownDuringOnConnectionAsyncDoesNotCrash()
{
var waitingConnectionAdapter = new WaitingConnectionAdapter();
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next =>
{
ConnectionAdapters = { waitingConnectionAdapter }
};
return async context =>
{
await tcs.Task;
await next(context);
};
});
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions))
{
@ -181,13 +180,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
using (var connection = server.CreateConnection())
{
var closingMessageTask = TestApplicationErrorLogger.WaitForMessage(m => m.Message.Contains(CoreStrings.ServerShutdownDuringConnectionInitialization));
stopTask = server.StopAsync();
await closingMessageTask.DefaultTimeout();
waitingConnectionAdapter.Complete();
tcs.TrySetResult(null);
}
await stopTask;
@ -198,12 +193,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
[MemberData(nameof(EchoAppRequestDelegates))]
public async Task ThrowingSynchronousConnectionAdapterDoesNotCrashServer(RequestDelegate requestDelegate)
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
var connectionId = "";
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
listenOptions.Use(next => context =>
{
ConnectionAdapters = { new ThrowingConnectionAdapter() }
};
connectionId = context.ConnectionId;
throw new InvalidOperationException();
});
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(requestDelegate, serviceContext, listenOptions))
{
@ -218,18 +216,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
Assert.Contains(TestApplicationErrorLogger.Messages, m => m.Message.Contains($"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}."));
Assert.Contains(TestApplicationErrorLogger.Messages, m => m.Message.Contains("Unhandled exception while processing " + connectionId + "."));
}
[Fact]
public async Task CanFlushAsyncWithConnectionAdapter()
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new PassThroughConnectionAdapter() }
};
.UsePassThrough();
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(async context =>
{
@ -258,11 +254,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
public async Task CanFlushAsyncWithConnectionAdapterPipeWriter()
{
var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new PassThroughConnectionAdapter() }
};
.UsePassThrough();
var serviceContext = new TestServiceContext(LoggerFactory);
var serviceContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1 };
await using (var server = new TestServer(async context =>
{
@ -287,71 +281,67 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
}
}
private class RewritingConnectionAdapter : IConnectionAdapter
private class RewritingConnectionMiddleware
{
private RewritingStream _rewritingStream;
private readonly ConnectionDelegate _next;
public bool IsHttps => false;
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
public RewritingConnectionMiddleware(ConnectionDelegate next)
{
_rewritingStream = new RewritingStream(context.ConnectionStream);
return Task.FromResult<IAdaptedConnection>(new AdaptedConnection(_rewritingStream));
_next = next;
}
public async Task OnConnectionAsync(ConnectionContext context)
{
var old = context.Transport;
var duplexPipe = new DuplexPipeStreamAdapter<RewritingStream>(context.Transport, s => new RewritingStream(s));
_rewritingStream = duplexPipe.Stream;
try
{
await using (duplexPipe)
{
context.Transport = duplexPipe;
await _next(context);
}
}
finally
{
context.Transport = old;
}
}
public int BytesRead => _rewritingStream.BytesRead;
}
private class AsyncConnectionAdapter : IConnectionAdapter
private class AsyncConnectionMiddleware
{
public bool IsHttps => false;
private readonly ConnectionDelegate _next;
public async Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
public AsyncConnectionMiddleware(ConnectionDelegate next)
{
_next = next;
}
public async Task OnConnectionAsync(ConnectionContext context)
{
await Task.Yield();
return new AdaptedConnection(new RewritingStream(context.ConnectionStream));
}
}
private class WaitingConnectionAdapter : IConnectionAdapter
{
private TaskCompletionSource<object> _waitingTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var old = context.Transport;
var duplexPipe = new DuplexPipeStreamAdapter<RewritingStream>(context.Transport, s => new RewritingStream(s));
public bool IsHttps => false;
public async Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
{
await _waitingTcs.Task;
return new AdaptedConnection(context.ConnectionStream);
}
public void Complete()
{
_waitingTcs.TrySetResult(null);
}
}
private class ThrowingConnectionAdapter : IConnectionAdapter
{
public bool IsHttps => false;
public Task<IAdaptedConnection> OnConnectionAsync(ConnectionAdapterContext context)
{
throw new Exception();
}
}
private class AdaptedConnection : IAdaptedConnection
{
public AdaptedConnection(Stream adaptedStream)
{
ConnectionStream = adaptedStream;
}
public Stream ConnectionStream { get; }
public void Dispose()
{
try
{
await using (duplexPipe)
{
context.Transport = duplexPipe;
await _next(context);
}
}
finally
{
context.Transport = old;
}
}
}

View File

@ -6,7 +6,7 @@ using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.Http2
{
@ -34,7 +34,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.Http2
{
try
{
await stream.CopyToAsync(new RawStream(null, pipe.Writer), bufferSize: 4096, cancellationToken);
await stream.CopyToAsync(new DuplexPipeStream(null, pipe.Writer), bufferSize: 4096, cancellationToken);
}
catch (OperationCanceledException)
{
@ -49,4 +49,4 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.Http2
}
}
}
}
}

View File

@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests
context.Response.ContentLength = 12;
return context.Response.WriteAsync("Hello World!");
},
new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 1},
new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = 3 },
listenOptions =>
{
listenOptions.UseConnectionLogging();

View File

@ -18,17 +18,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public static TheoryData<ListenOptions> ConnectionAdapterData => new TheoryData<ListenOptions>
{
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)),
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
{
ConnectionAdapters = { new PassThroughConnectionAdapter() }
}
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)).UsePassThrough()
};
[Theory]
[MemberData(nameof(ConnectionAdapterData))]
public async Task ConnectionClosedWhenResponseNotDrainedAtMinimumDataRate(ListenOptions listenOptions)
{
var testContext = new TestServiceContext(LoggerFactory);
var testContext = new TestServiceContext(LoggerFactory) { ExpectedConnectionMiddlewareCount = listenOptions._middleware.Count };
var heartbeatManager = new HeartbeatManager(testContext.ConnectionManager);
var minRate = new MinDataRate(16384, TimeSpan.FromSeconds(2));

View File

@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Testing;
namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.TestTransport
@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.InMemory.FunctionalTests.TestTrans
internal class InMemoryConnection : StreamBackedTestConnection
{
public InMemoryConnection(InMemoryTransportConnection transportConnection)
: base(new RawStream(transportConnection.Output, transportConnection.Input))
: base(new DuplexPipeStream(transportConnection.Output, transportConnection.Input))
{
TransportConnection = transportConnection;
}