diff --git a/KestrelHttpServer.sln b/KestrelHttpServer.sln index 3c10b81d05..210365479a 100644 --- a/KestrelHttpServer.sln +++ b/KestrelHttpServer.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26405.0 +VisualStudioVersion = 15.0.26403.7 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7972A5D6-3385-4127-9277-428506DD44FF}" ProjectSection(SolutionItems) = preProject @@ -26,7 +26,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{0EF2AC test\shared\LifetimeNotImplemented.cs = test\shared\LifetimeNotImplemented.cs test\shared\MockFrameControl.cs = test\shared\MockFrameControl.cs test\shared\MockLogger.cs = test\shared\MockLogger.cs - test\shared\MockSocketOutput.cs = test\shared\MockSocketOutput.cs test\shared\MockSystemClock.cs = test\shared\MockSystemClock.cs test\shared\StringExtensions.cs = test\shared\StringExtensions.cs test\shared\TestApp.cs = test\shared\TestApp.cs diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs index c8240a2e21..b73cbdb50e 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { @@ -15,77 +15,149 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { private const int MinAllocBufferSize = 2048; - private readonly Stream _filteredStream; - private readonly StreamSocketOutput _output; private readonly IKestrelTrace _trace; + private readonly IPipeWriter _transportOutputPipeWriter; + private readonly IPipeReader _transportInputPipeReader; - public AdaptedPipeline( - Stream filteredStream, - IPipe inputPipe, - IPipe outputPipe, - IKestrelTrace trace) + public AdaptedPipeline(IPipeReader transportInputPipeReader, + IPipeWriter transportOutputPipeWriter, + IPipe inputPipe, + IPipe outputPipe, + IKestrelTrace trace) { + _transportInputPipeReader = transportInputPipeReader; + _transportOutputPipeWriter = transportOutputPipeWriter; Input = inputPipe; - _output = new StreamSocketOutput(filteredStream, outputPipe); - _filteredStream = filteredStream; + Output = outputPipe; _trace = trace; } public IPipe Input { get; } - public ISocketOutput Output => _output; + public IPipe Output { get; } - public async Task RunAsync() + public async Task RunAsync(Stream stream) { - try - { - var inputTask = ReadInputAsync(); - var outputTask = _output.WriteOutputAsync(); + var inputTask = ReadInputAsync(stream); + var outputTask = WriteOutputAsync(stream); - await inputTask; - - _output.Dispose(); - - await outputTask; - } - catch (Exception ex) - { - // adaptedPipeline.RunAsync() shouldn't throw, unless filtered stream's WriteAsync throws. - _trace.LogError(0, ex, $"{nameof(AdaptedPipeline)}.{nameof(RunAsync)}"); - } + await inputTask; + await outputTask; } - private async Task ReadInputAsync() + private async Task WriteOutputAsync(Stream stream) { - int bytesRead; + Exception error = null; - do + try { - var block = Input.Writer.Alloc(MinAllocBufferSize); - - try + if (stream == null) { - var array = block.Buffer.GetArray(); + return; + } + + while (true) + { + var readResult = await Output.Reader.ReadAsync(); + var buffer = readResult.Buffer; + try { - bytesRead = await _filteredStream.ReadAsync(array.Array, array.Offset, array.Count); - block.Advance(bytesRead); + if (buffer.IsEmpty && readResult.IsCompleted) + { + break; + } + + if (buffer.IsEmpty) + { + await stream.FlushAsync(); + } + else if (buffer.IsSingleSpan) + { + var array = buffer.First.GetArray(); + await stream.WriteAsync(array.Array, array.Offset, array.Count); + } + else + { + foreach (var memory in buffer) + { + var array = memory.GetArray(); + await stream.WriteAsync(array.Array, array.Offset, array.Count); + } + } } finally { - await block.FlushAsync(); + Output.Reader.Advance(buffer.End); } } - catch (Exception ex) + } + catch (Exception ex) + { + error = ex; + } + finally + { + Output.Reader.Complete(); + _transportOutputPipeWriter.Complete(error); + } + } + + private async Task ReadInputAsync(Stream stream) + { + Exception error = null; + + try + { + if (stream == null) { - Input.Writer.Complete(ex); - - // Don't rethrow the exception. It should be handled by the Pipeline consumer. - return; + // If the stream is null then we're going to abort the connection + throw new ConnectionAbortedException(); } - } while (bytesRead != 0); - Input.Writer.Complete(); + while (true) + { + + var outputBuffer = Input.Writer.Alloc(MinAllocBufferSize); + + var array = outputBuffer.Buffer.GetArray(); + try + { + var bytesRead = await stream.ReadAsync(array.Array, array.Offset, array.Count); + outputBuffer.Advance(bytesRead); + + if (bytesRead == 0) + { + // FIN + break; + } + } + finally + { + outputBuffer.Commit(); + } + + var result = await outputBuffer.FlushAsync(); + + if (result.IsCompleted) + { + break; + } + + } + } + catch (Exception ex) + { + // Don't rethrow the exception. It should be handled by the Pipeline consumer. + error = ex; + } + finally + { + Input.Writer.Complete(error); + // The application could have ended the input pipe so complete + // the transport pipe as well + _transportInputPipeReader.Complete(); + } } } -} +} \ No newline at end of file diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs index 2423630c91..cec2ba1e75 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/RawStream.cs @@ -5,7 +5,6 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal @@ -13,9 +12,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal public class RawStream : Stream { private readonly IPipeReader _input; - private readonly ISocketOutput _output; + private readonly IPipeWriter _output; - public RawStream(IPipeReader input, ISocketOutput output) + public RawStream(IPipeReader input, IPipeWriter output) { _input = input; _output = output; @@ -71,19 +70,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal public override void Write(byte[] buffer, int offset, int count) { - ArraySegment segment; - if (buffer != null) - { - segment = new ArraySegment(buffer, offset, count); - } - else - { - segment = default(ArraySegment); - } - _output.Write(segment); + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) { ArraySegment segment; if (buffer != null) @@ -94,17 +84,19 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { segment = default(ArraySegment); } - return _output.WriteAsync(segment, cancellationToken: token); + var output = _output.Alloc(); + output.Write(segment); + await output.FlushAsync(token); } public override void Flush() { - _output.Flush(); + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); } public override Task FlushAsync(CancellationToken cancellationToken) { - return _output.FlushAsync(cancellationToken); + return WriteAsync(null, 0, 0, cancellationToken); } private async Task ReadAsync(ArraySegment buffer) @@ -209,11 +201,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal }, tcs, cancellationToken); return tcs.Task; } - - protected override void Dispose(bool disposing) - { - // _output is disposed by ConnectionLifetimeControl - _input.Complete(); - } } -} +} \ No newline at end of file diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs deleted file mode 100644 index 6055f271f5..0000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Adapter/Internal/StreamSocketOutput.cs +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal -{ - public class StreamSocketOutput : ISocketOutput - { - private readonly Stream _outputStream; - private readonly IPipe _pipe; - private readonly object _sync = new object(); - private bool _completed; - - public StreamSocketOutput(Stream outputStream, IPipe pipe) - { - _outputStream = outputStream; - _pipe = pipe; - } - - public void Write(ArraySegment buffer, bool chunk) - { - WriteAsync(buffer, chunk, default(CancellationToken)).GetAwaiter().GetResult(); - } - - public async Task WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) - { - var flushAwaiter = default(WritableBufferAwaitable); - - lock (_sync) - { - if (_completed) - { - return; - } - - var writableBuffer = _pipe.Writer.Alloc(1); - var writer = new WritableBufferWriter(writableBuffer); - if (buffer.Count > 0) - { - if (chunk) - { - ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Count); - writer.Write(buffer.Array, buffer.Offset, buffer.Count); - ChunkWriter.WriteEndChunkBytes(ref writer); - } - else - { - writer.Write(buffer.Array, buffer.Offset, buffer.Count); - } - } - - flushAwaiter = writableBuffer.FlushAsync(cancellationToken); - } - - await flushAwaiter; - } - - public void Dispose() - { - lock (_sync) - { - _completed = true; - } - - _pipe.Writer.Complete(); - } - - public void Flush() - { - FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); - } - - public Task FlushAsync(CancellationToken cancellationToken) - { - return WriteAsync(default(ArraySegment), chunk: false, cancellationToken: cancellationToken); - } - - public void Write(Action callback, T state) where T : struct - { - lock (_sync) - { - if (_completed) - { - return; - } - - var buffer = _pipe.Writer.Alloc(1); - callback(buffer, state); - buffer.Commit(); - } - } - - public async Task WriteOutputAsync() - { - try - { - while (true) - { - var readResult = await _pipe.Reader.ReadAsync(); - var buffer = readResult.Buffer; - - try - { - if (buffer.IsEmpty && readResult.IsCompleted) - { - break; - } - - if (buffer.IsEmpty) - { - await _outputStream.FlushAsync(); - } - else if (buffer.IsSingleSpan) - { - var array = buffer.First.GetArray(); - await _outputStream.WriteAsync(array.Array, array.Offset, array.Count); - } - else - { - foreach (var memory in buffer) - { - var array = memory.GetArray(); - await _outputStream.WriteAsync(array.Array, array.Offset, array.Count); - } - } - } - finally - { - _pipe.Reader.Advance(buffer.End); - } - } - } - finally - { - _pipe.Reader.Complete(); - } - } - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs index 921f87424c..523af72020 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/ConnectionHandler.cs @@ -40,10 +40,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal ServiceContext = _serviceContext }; - // TODO: Untangle this mess var frame = new Frame(_application, frameContext); - var outputProducer = new OutputProducer(outputPipe.Writer, frame, connectionId, _serviceContext.Log); - frame.LifetimeControl = new ConnectionLifetimeControl(connectionId, outputPipe.Reader, outputProducer, _serviceContext.Log); var connection = new FrameConnection(new FrameConnectionContext { @@ -55,7 +52,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal Frame = frame, Input = inputPipe, Output = outputPipe, - OutputProducer = outputProducer }); _serviceContext.Log.ConnectionStart(connectionId); diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs index e0d0a9063d..b25f7a983d 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnection.cs @@ -28,7 +28,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal private TimeoutAction _timeoutAction; private Task _lifetimeTask; - private Stream _filteredStream; public FrameConnection(FrameConnectionContext context) { @@ -69,46 +68,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal private async Task ProcessRequestsAsync() { - RawStream rawStream = null; - try { - Task adaptedPipelineTask = Task.CompletedTask; + AdaptedPipeline adaptedPipeline = null; + var adaptedPipelineTask = Task.CompletedTask; + var input = _context.Input.Reader; + var output = _context.Output.Writer; - if (_connectionAdapters.Count == 0) + if (_connectionAdapters.Count > 0) { - _frame.Input = _context.Input.Reader; - _frame.Output = _context.OutputProducer; - } - else - { - rawStream = new RawStream(_context.Input.Reader, _context.OutputProducer); - - try - { - var adaptedPipeline = await ApplyConnectionAdaptersAsync(rawStream); - - _frame.Input = adaptedPipeline.Input.Reader; - _frame.Output = adaptedPipeline.Output; - - adaptedPipelineTask = adaptedPipeline.RunAsync(); - } - catch (Exception ex) - { - Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}."); - - // Since Frame.ProcessRequestsAsync() isn't called, we have to close the socket here. - _context.OutputProducer.Dispose(); - - await _socketClosedTcs.Task; - return; - } + adaptedPipeline = new AdaptedPipeline(_context.Input.Reader, + _context.Output.Writer, + PipeFactory.Create(AdaptedInputPipeOptions), + PipeFactory.Create(AdaptedOutputPipeOptions), + Log); + input = adaptedPipeline.Input.Reader; + output = adaptedPipeline.Output.Writer; } + // Set these before the first await, this is to make sure that we don't yield control + // to the transport until we've added the connection to the connection manager _frame.TimeoutControl = this; - _lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks; + _frame.Input = input; + _frame.Output = new OutputProducer(output, ConnectionId, Log); _context.ServiceContext.ConnectionManager.AddConnection(_context.FrameConnectionId, this); + _lastTimestamp = _context.ServiceContext.SystemClock.UtcNow.Ticks; + + if (adaptedPipeline != null) + { + // Stream can be null here and run async will close the connection in that case + var stream = await ApplyConnectionAdaptersAsync(); + adaptedPipelineTask = adaptedPipeline.RunAsync(stream); + } await _frame.ProcessRequestsAsync(); await adaptedPipelineTask; @@ -121,14 +113,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal finally { _context.ServiceContext.ConnectionManager.RemoveConnection(_context.FrameConnectionId); - rawStream?.Dispose(); DisposeAdaptedConnections(); } } public void OnConnectionClosed(Exception ex) { - // Abort the connection (if it isn't already aborted) + // Abort the connection (if not already aborted) _frame.Abort(ex); Log.ConnectionStop(ConnectionId); @@ -139,17 +130,21 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal public Task StopAsync() { _frame.Stop(); + return _lifetimeTask; } public void Abort(Exception ex) { + // Abort the connection (if not already aborted) _frame.Abort(ex); } public Task AbortAsync(Exception ex) { + // Abort the connection (if not already aborted) _frame.Abort(ex); + return _lifetimeTask; } @@ -158,25 +153,33 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal _frame.SetBadRequestState(RequestRejectionReason.RequestTimeout); } - private async Task ApplyConnectionAdaptersAsync(RawStream rawStream) + private async Task ApplyConnectionAdaptersAsync() { - var adapterContext = new ConnectionAdapterContext(rawStream); + var stream = new RawStream(_context.Input.Reader, _context.Output.Writer); + var adapterContext = new ConnectionAdapterContext(stream); var adaptedConnections = new IAdaptedConnection[_connectionAdapters.Count]; - for (var i = 0; i < _connectionAdapters.Count; i++) + try { - var adaptedConnection = await _connectionAdapters[i].OnConnectionAsync(adapterContext); - adaptedConnections[i] = adaptedConnection; - adapterContext = new ConnectionAdapterContext(adaptedConnection.ConnectionStream); + for (var i = 0; i < _connectionAdapters.Count; i++) + { + var adaptedConnection = await _connectionAdapters[i].OnConnectionAsync(adapterContext); + adaptedConnections[i] = adaptedConnection; + adapterContext = new ConnectionAdapterContext(adaptedConnection.ConnectionStream); + } + } + catch (Exception ex) + { + Log.LogError(0, ex, $"Uncaught exception from the {nameof(IConnectionAdapter.OnConnectionAsync)} method of an {nameof(IConnectionAdapter)}."); + + return null; + } + finally + { + _frame.AdaptedConnections = adaptedConnections; } - _filteredStream = adapterContext.ConnectionStream; - _frame.AdaptedConnections = adaptedConnections; - - return new AdaptedPipeline(_filteredStream, - PipeFactory.Create(AdaptedInputPipeOptions), - PipeFactory.Create(AdaptedOutputPipeOptions), - Log); + return adapterContext.ConnectionStream; } private void DisposeAdaptedConnections() diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs index 1394f6c150..d9a7c38148 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/FrameConnectionContext.cs @@ -16,7 +16,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal public PipeFactory PipeFactory { get; set; } public List ConnectionAdapters { get; set; } public Frame Frame { get; set; } - public OutputProducer OutputProducer { get; set; } public IPipe Input { get; set; } public IPipe Output { get; set; } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs deleted file mode 100644 index 1d6efcd6db..0000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ConnectionLifetimeControl.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http -{ - public class ConnectionLifetimeControl - { - public ConnectionLifetimeControl( - string connectionId, - IPipeReader outputPipeReader, - OutputProducer outputProducer, - IKestrelTrace log) - { - ConnectionId = connectionId; - OutputReader = outputPipeReader; - OutputProducer = outputProducer; - Log = log; - } - - private string ConnectionId { get; } - private IPipeReader OutputReader { get; } - private OutputProducer OutputProducer { get; } - private IKestrelTrace Log { get; } - - public void End(ProduceEndType endType) - { - switch (endType) - { - case ProduceEndType.ConnectionKeepAlive: - Log.ConnectionKeepAlive(ConnectionId); - break; - case ProduceEndType.SocketShutdown: - OutputReader.CancelPendingRead(); - goto case ProduceEndType.SocketDisconnect; - case ProduceEndType.SocketDisconnect: - OutputProducer.Dispose(); - Log.ConnectionDisconnect(ConnectionId); - break; - } - } - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs index 417fadb671..6a85d64757 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/Frame.cs @@ -102,9 +102,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public IConnectionInformation ConnectionInformation => _frameContext.ConnectionInformation; public IPipeReader Input { get; set; } - public ISocketOutput Output { get; set; } + public OutputProducer Output { get; set; } public IAdaptedConnection[] AdaptedConnections { get; set; } - public ConnectionLifetimeControl LifetimeControl { get; set; } public ITimeoutControl TimeoutControl { get; set; } protected IKestrelTrace Log => ServiceContext.Log; @@ -411,7 +410,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _frameStreams?.Abort(error); - LifetimeControl.End(ProduceEndType.SocketDisconnect); + Output.Abort(); // Potentially calling user code. CancelRequestAbortedToken logs any exceptions. ServiceContext.ThreadPool.UnsafeRun(state => ((Frame)state).CancelRequestAbortedToken(), this); @@ -827,7 +826,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http if (_keepAlive) { - LifetimeControl.End(ProduceEndType.ConnectionKeepAlive); + Log.ConnectionKeepAlive(ConnectionId); } if (HttpMethods.IsHead(Method) && _responseBytesWritten > 0) @@ -847,7 +846,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http if (_keepAlive) { - LifetimeControl.End(ProduceEndType.ConnectionKeepAlive); + Log.ConnectionKeepAlive(ConnectionId); } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs index 3be6a58c83..b0890e17ae 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/FrameOfT.cs @@ -229,7 +229,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http if (Volatile.Read(ref _requestAborted) == 0) { await TryProduceInvalidRequestResponse(); - LifetimeControl.End(ProduceEndType.SocketShutdown); + Output.Dispose(); } } catch (Exception ex) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs deleted file mode 100644 index 139a5000a0..0000000000 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/ISocketOutput.cs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http -{ - /// - /// Operations performed for buffered socket output - /// - public interface ISocketOutput - { - void Write(ArraySegment buffer, bool chunk = false); - Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)); - void Flush(); - Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)); - void Write(Action write, T state) where T : struct; - } -} diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs index c5fe3dee51..0d024d6a7f 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Core/Internal/Http/OutputProducer.cs @@ -6,11 +6,12 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.Extensions.Internal; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { - public class OutputProducer : ISocketOutput, IDisposable + public class OutputProducer : IDisposable { private static readonly ArraySegment _emptyData = new ArraySegment(new byte[0]); @@ -24,7 +25,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private bool _completed = false; private readonly IPipeWriter _pipe; - private readonly Frame _frame; // https://github.com/dotnet/corefxlab/issues/1334 // Pipelines don't support multiple awaiters on flush @@ -33,16 +33,90 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private readonly object _flushLock = new object(); private Action _flushCompleted; - public OutputProducer(IPipeWriter pipe, Frame frame, string connectionId, IKestrelTrace log) + public OutputProducer(IPipeWriter pipe, string connectionId, IKestrelTrace log) { _pipe = pipe; - _frame = frame; _connectionId = connectionId; _log = log; _flushCompleted = OnFlushCompleted; } - public Task WriteAsync( + public void Write(ArraySegment buffer, bool chunk = false) + { + WriteAsync(buffer, default(CancellationToken), chunk).GetAwaiter().GetResult(); + } + + public Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)) + { + if (cancellationToken.IsCancellationRequested) + { + _cancelled = true; + return Task.FromCanceled(cancellationToken); + } + else if (_cancelled) + { + return TaskCache.CompletedTask; + } + + return WriteAsync(buffer, cancellationToken, chunk); + } + + public void Flush() + { + WriteAsync(_emptyData, default(CancellationToken)).GetAwaiter().GetResult(); + } + + public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + return WriteAsync(_emptyData, cancellationToken); + } + + public void Write(Action callback, T state) + { + lock (_contextLock) + { + if (_completed) + { + return; + } + + var buffer = _pipe.Alloc(1); + callback(buffer, state); + buffer.Commit(); + } + } + + public void Dispose() + { + lock (_contextLock) + { + if (_completed) + { + return; + } + + _log.ConnectionDisconnect(_connectionId); + _completed = true; + _pipe.Complete(); + } + } + + public void Abort() + { + lock (_contextLock) + { + if (_completed) + { + return; + } + + _log.ConnectionDisconnect(_connectionId); + _completed = true; + _pipe.Complete(new ConnectionAbortedException()); + } + } + + private Task WriteAsync( ArraySegment buffer, CancellationToken cancellationToken, bool chunk = false) @@ -108,11 +182,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } await _flushTcs.Task; - if (cancellationToken.IsCancellationRequested) - { - _frame.Abort(error: null); - } - cancellationToken.ThrowIfCancellationRequested(); } @@ -120,60 +189,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { _flushTcs.TrySetResult(null); } - - void ISocketOutput.Write(ArraySegment buffer, bool chunk) - { - WriteAsync(buffer, default(CancellationToken), chunk).GetAwaiter().GetResult(); - } - - Task ISocketOutput.WriteAsync(ArraySegment buffer, bool chunk, CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - { - _frame.Abort(error: null); - _cancelled = true; - return Task.FromCanceled(cancellationToken); - } - else if (_cancelled) - { - return TaskCache.CompletedTask; - } - - return WriteAsync(buffer, cancellationToken, chunk); - } - - void ISocketOutput.Flush() - { - WriteAsync(_emptyData, default(CancellationToken)).GetAwaiter().GetResult(); - } - - Task ISocketOutput.FlushAsync(CancellationToken cancellationToken) - { - return WriteAsync(_emptyData, cancellationToken); - } - - void ISocketOutput.Write(Action callback, T state) - { - lock (_contextLock) - { - if (_completed) - { - return; - } - - var buffer = _pipe.Alloc(1); - callback(buffer, state); - buffer.Commit(); - } - } - - public void Dispose() - { - lock (_contextLock) - { - _completed = true; - _pipe.Complete(); - } - } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs index d00610fa53..30b151a2c2 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs @@ -4,6 +4,7 @@ using System; using System.Threading.Tasks; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal @@ -36,48 +37,50 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal while (true) { - var result = await _pipe.ReadAsync(); - var buffer = result.Buffer; - var consumed = buffer.End; - try { - if (!buffer.IsEmpty) + var result = await _pipe.ReadAsync(); + var buffer = result.Buffer; + var consumed = buffer.End; + + try { - var writeReq = pool.Allocate(); - - try + if (!buffer.IsEmpty) { - var writeResult = await writeReq.WriteAsync(_socket, buffer); + var writeReq = pool.Allocate(); - LogWriteInfo(writeResult.Status, writeResult.Error); - - if (writeResult.Error != null) + try { - consumed = buffer.Start; - throw writeResult.Error; + var writeResult = await writeReq.WriteAsync(_socket, buffer); + + LogWriteInfo(writeResult.Status, writeResult.Error); + + if (writeResult.Error != null) + { + consumed = buffer.Start; + throw writeResult.Error; + } + } + finally + { + // Make sure we return the writeReq to the pool + pool.Return(writeReq); } } - finally + + if (buffer.IsEmpty && result.IsCompleted) { - // Make sure we return the writeReq to the pool - pool.Return(writeReq); + break; } } - - if (result.IsCancelled) + finally { - break; - } - - if (buffer.IsEmpty && result.IsCompleted) - { - break; + _pipe.Advance(consumed); } } - finally + catch (ConnectionAbortedException) { - _pipe.Advance(consumed); + break; } } } diff --git a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs index 6d5c7e7cf6..28685c3b60 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets/SocketConnection.cs @@ -216,6 +216,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets { error = null; } + catch (ConnectionAbortedException) + { + error = null; + } catch (IOException ex) { error = ex; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs index 43e076be7f..eb2c93f583 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/FrameTests.cs @@ -54,6 +54,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests { _pipelineFactory = new PipeFactory(); _input = _pipelineFactory.Create(); + var output = _pipelineFactory.Create(); _serviceContext = new TestServiceContext(); @@ -66,10 +67,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests _frame = new TestFrame(application: null, context: _frameContext) { Input = _input.Reader, - Output = new MockSocketOutput(), TimeoutControl = Mock.Of() }; + _frame.Output = new OutputProducer(output.Writer, "", Mock.Of()); + _frame.Reset(); } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs index e96126d087..1e8ae196bd 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/OutputProducerTests.cs @@ -38,7 +38,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests var called = false; - ((ISocketOutput)socketOutput).Write((buffer, state) => + socketOutput.Write((buffer, state) => { called = true; }, @@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests var pipe = _pipeFactory.Create(pipeOptions); var serviceContext = new TestServiceContext(); var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); - var socketOutput = new OutputProducer(pipe.Writer, frame, "0", serviceContext.Log); + var socketOutput = new OutputProducer(pipe.Writer, "0", serviceContext.Log); return socketOutput; } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs deleted file mode 100644 index 0a3f383dd9..0000000000 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Core.Tests/StreamSocketOutputTests.cs +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO; -using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Xunit; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests -{ - public class StreamSocketOutputTests - { - [Fact] - public void DoesNotThrowForNullBuffers() - { - // This test was added because SslStream throws if passed null buffers with (count == 0) - // Which happens if ProduceEnd is called in Frame without _responseStarted == true - // As it calls ProduceStart with write immediate == true - // This happens in WebSocket Upgrade over SSL - using (var factory = new PipeFactory()) - { - var socketOutput = new StreamSocketOutput(new ThrowsOnNullWriteStream(), factory.Create()); - - // Should not throw - socketOutput.Write(default(ArraySegment), true); - - Assert.True(true); - - socketOutput.Dispose(); - } - } - - private class ThrowsOnNullWriteStream : Stream - { - public override bool CanRead - { - get - { - throw new NotImplementedException(); - } - } - - public override bool CanSeek - { - get - { - throw new NotImplementedException(); - } - } - - public override bool CanWrite - { - get - { - throw new NotImplementedException(); - } - } - - public override long Length - { - get - { - throw new NotImplementedException(); - } - } - - public override long Position - { - get - { - throw new NotImplementedException(); - } - - set - { - throw new NotImplementedException(); - } - } - - public override void Flush() - { - throw new NotImplementedException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - throw new NotImplementedException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override void SetLength(long value) - { - throw new NotImplementedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - } - } - } -} diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs index 906e81ae76..eb5a857358 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/ConnectionAdapterTests.cs @@ -4,6 +4,7 @@ using System; using System.IO; using System.Net; +using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http.Features; @@ -76,6 +77,70 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests } } + [Fact] + public async Task ImmediateFinAfterOnConnectionAsyncClosesGracefully() + { + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)) + { + ConnectionAdapters = { new AsyncConnectionAdapter() } + }; + + var serviceContext = new TestServiceContext(); + + using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + // FIN + connection.Shutdown(SocketShutdown.Send); + await connection.WaitForConnectionClose(); + } + } + } + + [Fact] + public async Task ImmediateFinAfterThrowingClosesGracefully() + { + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)) + { + ConnectionAdapters = { new ThrowingConnectionAdapter() } + }; + + var serviceContext = new TestServiceContext(); + + using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + // FIN + connection.Shutdown(SocketShutdown.Send); + await connection.WaitForConnectionClose(); + } + } + } + + [Fact] + public async Task ImmediateShutdownAfterOnConnectionAsyncDoesNotCrash() + { + var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)) + { + ConnectionAdapters = { new AsyncConnectionAdapter() } + }; + + var serviceContext = new TestServiceContext(); + + var stopTask = Task.CompletedTask; + using (var server = new TestServer(TestApp.EchoApp, serviceContext, listenOptions)) + { + using (var connection = server.CreateConnection()) + { + stopTask = server.StopAsync(); + } + + await stopTask; + } + } + [Fact] public async Task ThrowingSynchronousConnectionAdapterDoesNotCrashServer() { @@ -121,7 +186,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests } public int BytesRead => _rewritingStream.BytesRead; - } + } private class AsyncConnectionAdapter : IConnectionAdapter { diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs index c4eb51dfdf..5c58b48664 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/RequestTests.cs @@ -1051,24 +1051,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests { using (var connection = server.CreateConnection()) { - // Never send the body so CopyToAsync always fails. + // Full request and response await connection.Send( "POST / HTTP/1.1", "Host:", "Content-Length: 5", "", - "HelloPOST / HTTP/1.1", - "Host:", - "Content-Length: 5", - "", - ""); + "Hello"); - await connection.ReceiveForcedEnd( + await connection.Receive( "HTTP/1.1 200 OK", $"Date: {testContext.DateHeaderValue}", "Content-Length: 5", "", "World"); + + // Never send the body so CopyToAsync always fails. + await connection.Send("POST / HTTP/1.1", + "Host:", + "Content-Length: 5", + "", + ""); + await connection.WaitForConnectionClose(); } } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs index 73f2adfbf9..930c21cd08 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.FunctionalTests/TestServer.cs @@ -5,6 +5,7 @@ using System; using System.Net; using System.Net.Sockets; using System.Reflection; +using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting.Server; @@ -108,6 +109,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.FunctionalTests return new TestConnection(Port, AddressFamily); } + public Task StopAsync() + { + return _host.StopAsync(); + } + public void Dispose() { _host.Dispose(); diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs index b735e76ff9..bef07b9063 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/FrameWritingBenchmark.cs @@ -87,7 +87,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance private TestFrame MakeFrame() { - var socketInput = new PipeFactory().Create(); + var factory = new PipeFactory(); + var input = factory.Create(); + var output = factory.Create(); var serviceContext = new ServiceContext { @@ -104,10 +106,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance var frame = new TestFrame(application: null, context: frameContext) { - Input = socketInput.Reader, - Output = new MockSocketOutput() + Input = input.Reader, }; + frame.Output = new OutputProducer(output.Writer, "", null); + frame.Reset(); return frame; diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj index 54cd499106..42fcca542b 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/Microsoft.AspNetCore.Server.Kestrel.Performance.csproj @@ -1,4 +1,4 @@ - + @@ -14,7 +14,6 @@ - diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs index 012deb1b70..8f28af9ef6 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Performance/ResponseHeadersWritingBenchmark.cs @@ -8,7 +8,6 @@ using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Microsoft.AspNetCore.Server.Kestrel.Core; -using Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; @@ -113,7 +112,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance var factory = new PipeFactory(); var input = factory.Create(); var output = factory.Create(); - var socketOutput = new StreamSocketOutput(Stream.Null, output); var serviceContext = new ServiceContext { @@ -129,19 +127,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Performance ConnectionInformation = new MockConnectionInformation() }; - var outputProducer = new OutputProducer(output.Writer, null, null, null); var frame = new TestFrame(application: null, context: frameContext) { Input = input.Reader, - Output = socketOutput, - LifetimeControl = new ConnectionLifetimeControl(null, output.Reader, outputProducer, serviceContext.Log) }; + frame.Output = new OutputProducer(output.Writer, "", null); frame.Reset(); - // Start writing - var ignore = socketOutput.WriteOutputAsync(); - _frame = frame; } diff --git a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs index bbaaea6054..13b3706657 100644 --- a/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs +++ b/test/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests/LibuvOutputConsumerTests.cs @@ -9,7 +9,6 @@ using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking; using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers; @@ -75,7 +74,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask = socketOutput.WriteAsync(buffer); // Assert await writeTask.TimeoutAfter(TimeSpan.FromSeconds(5)); @@ -110,7 +109,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask = socketOutput.WriteAsync(buffer); // Assert await writeTask.TimeoutAfter(TimeSpan.FromSeconds(5)); @@ -156,7 +155,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask = socketOutput.WriteAsync(buffer); // Assert Assert.False(writeTask.IsCompleted); @@ -211,14 +210,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act - var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask1 = socketOutput.WriteAsync(buffer); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. Assert.Equal(TaskStatus.RanToCompletion, writeTask1.Status); // Act - var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask2 = socketOutput.WriteAsync(buffer); await _mockLibuv.OnPostTask; // Assert @@ -275,7 +274,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var halfWriteBehindBuffer = new ArraySegment(data, 0, bufferSize); // Act - var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + var writeTask1 = socketOutput.WriteAsync(halfWriteBehindBuffer); // Assert // The first write should pre-complete since it is <= _maxBytesPreCompleted. @@ -284,17 +283,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.NotEmpty(completeQueue); // Add more bytes to the write-behind buffer to prevent the next write from - ((ISocketOutput)socketOutput).Write((writableBuffer, state) => + socketOutput.Write((writableBuffer, state) => { writableBuffer.Write(state); }, halfWriteBehindBuffer); // Act - var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + var writeTask2 = socketOutput.WriteAsync(halfWriteBehindBuffer); Assert.False(writeTask2.IsCompleted); - var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer, default(CancellationToken)); + var writeTask3 = socketOutput.WriteAsync(halfWriteBehindBuffer); Assert.False(writeTask3.IsCompleted); // Drain the write queue @@ -345,7 +344,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // task1 should complete successfully as < _maxBytesPreCompleted // First task is completed and successful @@ -354,8 +353,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task1Success.IsFaulted); // following tasks should wait. - var task2Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task2Success = socketOutput.WriteAsync(fullBuffer); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; @@ -383,7 +382,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // A final write guarantees that the error is observed by OutputProducer, // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task4Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: default(CancellationToken)); Assert.True(task4Success.IsCompleted); Assert.False(task4Success.IsCanceled); Assert.False(task4Success.IsFaulted); @@ -429,7 +428,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act - var task1Success = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task1Success = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // task1 should complete successfully as < _maxBytesPreCompleted // First task is completed and successful @@ -438,7 +437,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task1Success.IsFaulted); // following tasks should wait. - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; @@ -458,7 +457,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // A final write guarantees that the error is observed by OutputProducer, // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task4Success = socketOutput.WriteAsync(fullBuffer); Assert.True(task4Success.IsCompleted); Assert.False(task4Success.IsCanceled); Assert.False(task4Success.IsFaulted); @@ -504,7 +503,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var fullBuffer = new ArraySegment(data, 0, bufferSize); // Act - var task1Waits = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task1Waits = socketOutput.WriteAsync(fullBuffer); // First task is not completed Assert.False(task1Waits.IsCompleted); @@ -512,7 +511,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.False(task1Waits.IsFaulted); // following tasks should wait. - var task3Canceled = socketOutput.WriteAsync(fullBuffer, abortedSource.Token); + var task3Canceled = socketOutput.WriteAsync(fullBuffer, cancellationToken: abortedSource.Token); // Give time for tasks to percolate await _mockLibuv.OnPostTask; @@ -537,7 +536,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // A final write guarantees that the error is observed by OutputProducer, // but doesn't return a canceled/faulted task. - var task4Success = socketOutput.WriteAsync(fullBuffer, default(CancellationToken)); + var task4Success = socketOutput.WriteAsync(fullBuffer); Assert.True(task4Success.IsCompleted); Assert.False(task4Success.IsCanceled); Assert.False(task4Success.IsFaulted); @@ -575,7 +574,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); // Act (Pre-complete the maximum number of bytes in preparation for the rest of the test) - var writeTask1 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask1 = socketOutput.WriteAsync(buffer); // Assert // The first write should pre-complete since it is < _maxBytesPreCompleted. @@ -584,8 +583,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests Assert.NotEmpty(completeQueue); // Act - var writeTask2 = socketOutput.WriteAsync(buffer, default(CancellationToken)); - var writeTask3 = socketOutput.WriteAsync(buffer, default(CancellationToken)); + var writeTask2 = socketOutput.WriteAsync(buffer); + var writeTask3 = socketOutput.WriteAsync(buffer); // Drain the write queue while (completeQueue.TryDequeue(out var triggerNextCompleted)) @@ -635,8 +634,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests // Two calls to WriteAsync trigger uv_write once if both calls // are made before write is scheduled - var ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); - ignore = socketOutput.WriteAsync(buffer, CancellationToken.None); + var ignore = socketOutput.WriteAsync(buffer); + ignore = socketOutput.WriteAsync(buffer); _mockLibuv.KestrelThreadBlocker.Set(); @@ -671,10 +670,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var frame = new Frame(null, new FrameContext { ServiceContext = serviceContext }); var socket = new MockSocket(_mockLibuv, _libuvThread.Loop.ThreadId, transportContext.Log); - var socketOutput = new OutputProducer(pipe.Writer, frame, "0", serviceContext.Log); + var outputProducer = new OutputProducer(pipe.Writer, "0", serviceContext.Log); var consumer = new LibuvOutputConsumer(pipe.Reader, _libuvThread, socket, "0", transportContext.Log); - - frame.LifetimeControl = new ConnectionLifetimeControl("0", pipe.Reader, socketOutput, serviceContext.Log); + frame.Output = outputProducer; if (cts != null) { @@ -683,7 +681,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var ignore = WriteOutputAsync(consumer, pipe.Reader, frame); - return socketOutput; + return outputProducer; } private async Task WriteOutputAsync(LibuvOutputConsumer consumer, IPipeReader outputReader, Frame frame) diff --git a/test/shared/MockSocketOutput.cs b/test/shared/MockSocketOutput.cs deleted file mode 100644 index 9089bca83d..0000000000 --- a/test/shared/MockSocketOutput.cs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; -using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines; -using Microsoft.Extensions.Internal; - -namespace Microsoft.AspNetCore.Testing -{ - public class MockSocketOutput : ISocketOutput - { - public MockSocketOutput() - { - } - - public void Write(ArraySegment buffer, bool chunk = false) - { - } - - public Task WriteAsync(ArraySegment buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken)) - { - return TaskCache.CompletedTask; - } - - public void Flush() - { - } - - public Task FlushAsync(CancellationToken cancellationToken = new CancellationToken()) - { - return TaskCache.CompletedTask; - } - - public void Write(Action write, T state) where T : struct - { - - } - } -}