diff --git a/src/Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs b/src/Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs index fd8c35f037..4204469241 100644 --- a/src/Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs +++ b/src/Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs @@ -79,15 +79,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal } else if (buffer.IsSingleSegment) { +#if NETCOREAPP2_1 + await stream.WriteAsync(buffer.First); +#else var array = buffer.First.GetArray(); await stream.WriteAsync(array.Array, array.Offset, array.Count); +#endif } else { foreach (var memory in buffer) { +#if NETCOREAPP2_1 + await stream.WriteAsync(memory); +#else var array = memory.GetArray(); await stream.WriteAsync(array.Array, array.Offset, array.Count); +#endif } } } @@ -125,10 +133,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal var outputBuffer = Input.Writer.GetMemory(MinAllocBufferSize); - var array = outputBuffer.GetArray(); try { +#if NETCOREAPP2_1 + var bytesRead = await stream.ReadAsync(outputBuffer); +#else + var array = outputBuffer.GetArray(); var bytesRead = await stream.ReadAsync(array.Array, array.Offset, array.Count); +#endif Input.Writer.Advance(bytesRead); if (bytesRead == 0) diff --git a/src/Kestrel.Core/Adapter/Internal/LoggingStream.cs b/src/Kestrel.Core/Adapter/Internal/LoggingStream.cs index 584bf6301d..62f62a5d4c 100644 --- a/src/Kestrel.Core/Adapter/Internal/LoggingStream.cs +++ b/src/Kestrel.Core/Adapter/Internal/LoggingStream.cs @@ -79,17 +79,35 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal public override int Read(byte[] buffer, int offset, int count) { int read = _inner.Read(buffer, offset, count); - Log("Read", read, buffer, offset); + Log("Read", new ReadOnlySpan(buffer, offset, read)); return read; } +#if NETCOREAPP2_1 + public override int Read(Span destination) + { + int read = _inner.Read(destination); + Log("Read", destination.Slice(0, read)); + return read; + } +#endif + public async override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int read = await _inner.ReadAsync(buffer, offset, count, cancellationToken); - Log("ReadAsync", read, buffer, offset); + Log("ReadAsync", new ReadOnlySpan(buffer, offset, read)); return read; } +#if NETCOREAPP2_1 + public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + int read = await _inner.ReadAsync(destination, cancellationToken); + Log("ReadAsync", destination.Span.Slice(0, read)); + return read; + } +#endif + public override long Seek(long offset, SeekOrigin origin) { return _inner.Seek(offset, origin); @@ -102,29 +120,45 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal public override void Write(byte[] buffer, int offset, int count) { - Log("Write", count, buffer, offset); + Log("Write", new ReadOnlySpan(buffer, offset, count)); _inner.Write(buffer, offset, count); } +#if NETCOREAPP2_1 + public override void Write(ReadOnlySpan source) + { + Log("Write", source); + _inner.Write(source); + } +#endif + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - Log("WriteAsync", count, buffer, offset); + Log("WriteAsync", new ReadOnlySpan(buffer, offset, count)); return _inner.WriteAsync(buffer, offset, count, cancellationToken); } - private void Log(string method, int count, byte[] buffer, int offset) +#if NETCOREAPP2_1 + public override Task WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) { - var builder = new StringBuilder($"{method}[{count}] "); + Log("WriteAsync", source.Span); + return _inner.WriteAsync(source, cancellationToken); + } +#endif + + private void Log(string method, ReadOnlySpan buffer) + { + var builder = new StringBuilder($"{method}[{buffer.Length}] "); // Write the hex - for (int i = offset; i < offset + count; i++) + for (int i = 0; i < buffer.Length; i++) { builder.Append(buffer[i].ToString("X2")); builder.Append(" "); } builder.AppendLine(); // Write the bytes as if they were ASCII - for (int i = offset; i < offset + count; i++) + for (int i = 0; i < buffer.Length; i++) { builder.Append((char)buffer[i]); } diff --git a/src/Kestrel.Core/Adapter/Internal/RawStream.cs b/src/Kestrel.Core/Adapter/Internal/RawStream.cs index 87a0932c32..32fb757db2 100644 --- a/src/Kestrel.Core/Adapter/Internal/RawStream.cs +++ b/src/Kestrel.Core/Adapter/Internal/RawStream.cs @@ -61,29 +61,44 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal { // ValueTask uses .GetAwaiter().GetResult() if necessary // https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156 - return ReadAsync(new ArraySegment(buffer, offset, count)).Result; + return ReadAsyncInternal(new Memory(buffer, offset, count)).Result; } public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - return ReadAsync(new ArraySegment(buffer, offset, count)); + return ReadAsyncInternal(new Memory(buffer, offset, count)).AsTask(); } +#if NETCOREAPP2_1 + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(destination); + } +#endif + public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (buffer != null) { _output.Write(new ReadOnlySpan(buffer, offset, count)); } - await _output.FlushAsync(token); + await _output.FlushAsync(cancellationToken); } +#if NETCOREAPP2_1 + public override async Task WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + _output.Write(source.Span); + await _output.FlushAsync(cancellationToken); + } +#endif + public override void Flush() { FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); @@ -94,7 +109,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal return WriteAsync(null, 0, 0, cancellationToken); } - private async Task ReadAsync(ArraySegment buffer) + private async ValueTask ReadAsyncInternal(Memory destination) { while (true) { @@ -105,9 +120,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal if (!readableBuffer.IsEmpty) { // buffer.Count is int - var count = (int) Math.Min(readableBuffer.Length, buffer.Count); + var count = (int) Math.Min(readableBuffer.Length, destination.Length); readableBuffer = readableBuffer.Slice(0, count); - readableBuffer.CopyTo(buffer); + readableBuffer.CopyTo(destination.Span); return count; } else if (result.IsCompleted) diff --git a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs index f220e7c70f..dbc8cf0f52 100644 --- a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs @@ -14,10 +14,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { public class Http1OutputProducer : IHttpOutputProducer { - private static readonly ArraySegment _continueBytes = new ArraySegment(Encoding.ASCII.GetBytes("HTTP/1.1 100 Continue\r\n\r\n")); + private static readonly ReadOnlyMemory _continueBytes = new ReadOnlyMemory(Encoding.ASCII.GetBytes("HTTP/1.1 100 Continue\r\n\r\n")); private static readonly byte[] _bytesHttpVersion11 = Encoding.ASCII.GetBytes("HTTP/1.1 "); private static readonly byte[] _bytesEndHeaders = Encoding.ASCII.GetBytes("\r\n\r\n"); - private static readonly ArraySegment _endChunkedResponseBytes = new ArraySegment(Encoding.ASCII.GetBytes("0\r\n\r\n")); + private static readonly ReadOnlyMemory _endChunkedResponseBytes = new ReadOnlyMemory(Encoding.ASCII.GetBytes("0\r\n\r\n")); private readonly string _connectionId; private readonly ITimeoutControl _timeoutControl; @@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http _flushCompleted = OnFlushCompleted; } - public Task WriteDataAsync(ArraySegment buffer, CancellationToken cancellationToken = default(CancellationToken)) + public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default(CancellationToken)) { if (cancellationToken.IsCancellationRequested) { @@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public Task WriteStreamSuffixAsync(CancellationToken cancellationToken) { - return WriteAsync(_endChunkedResponseBytes, cancellationToken); + return WriteAsync(_endChunkedResponseBytes.Span, cancellationToken); } public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) @@ -160,11 +160,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public Task Write100ContinueAsync(CancellationToken cancellationToken) { - return WriteAsync(_continueBytes, default(CancellationToken)); + return WriteAsync(_continueBytes.Span, default(CancellationToken)); } private Task WriteAsync( - ArraySegment buffer, + ReadOnlySpan buffer, CancellationToken cancellationToken) { var writableBuffer = default(PipeWriter); @@ -178,10 +178,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http writableBuffer = _pipeWriter; var writer = OutputWriter.Create(writableBuffer); - if (buffer.Count > 0) + if (buffer.Length > 0) { - writer.Write(new ReadOnlySpan(buffer.Array, buffer.Offset, buffer.Count)); - bytesWritten += buffer.Count; + writer.Write(buffer); + bytesWritten += buffer.Length; } writableBuffer.Commit(); diff --git a/src/Kestrel.Core/Internal/Http/HttpProtocol.cs b/src/Kestrel.Core/Internal/Http/HttpProtocol.cs index d0e90e1054..45bfa675ca 100644 --- a/src/Kestrel.Core/Internal/Http/HttpProtocol.cs +++ b/src/Kestrel.Core/Internal/Http/HttpProtocol.cs @@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private static readonly byte[] _bytesConnectionKeepAlive = Encoding.ASCII.GetBytes("\r\nConnection: keep-alive"); private static readonly byte[] _bytesTransferEncodingChunked = Encoding.ASCII.GetBytes("\r\nTransfer-Encoding: chunked"); private static readonly byte[] _bytesServer = Encoding.ASCII.GetBytes("\r\nServer: " + Constants.ServerName); - private static readonly Action> _writeChunk = WriteChunk; + private static readonly Action> _writeChunk = WriteChunk; private readonly object _onStartingSync = new Object(); private readonly object _onCompletedSync = new Object(); @@ -762,14 +762,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http await Output.FlushAsync(cancellationToken); } - public Task WriteAsync(ArraySegment data, CancellationToken cancellationToken = default(CancellationToken)) + public Task WriteAsync(ReadOnlyMemory data, CancellationToken cancellationToken = default(CancellationToken)) { // For the first write, ensure headers are flushed if WriteDataAsync isn't called. var firstWrite = !HasResponseStarted; if (firstWrite) { - var initializeTask = InitializeResponseAsync(data.Count); + var initializeTask = InitializeResponseAsync(data.Length); // If return is Task.CompletedTask no awaiting is required if (!ReferenceEquals(initializeTask, Task.CompletedTask)) { @@ -778,14 +778,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } else { - VerifyAndUpdateWrite(data.Count); + VerifyAndUpdateWrite(data.Length); } if (_canHaveBody) { if (_autoChunk) { - if (data.Count == 0) + if (data.Length == 0) { return !firstWrite ? Task.CompletedTask : FlushAsync(cancellationToken); } @@ -794,7 +794,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http else { CheckLastWrite(); - return Output.WriteDataAsync(data, cancellationToken: cancellationToken); + return Output.WriteDataAsync(data.Span, cancellationToken: cancellationToken); } } else @@ -804,7 +804,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } } - public async Task WriteAsyncAwaited(Task initializeTask, ArraySegment data, CancellationToken cancellationToken) + public async Task WriteAsyncAwaited(Task initializeTask, ReadOnlyMemory data, CancellationToken cancellationToken) { await initializeTask; @@ -814,7 +814,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { if (_autoChunk) { - if (data.Count == 0) + if (data.Length == 0) { await FlushAsync(cancellationToken); return; @@ -825,7 +825,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http else { CheckLastWrite(); - await Output.WriteDataAsync(data, cancellationToken: cancellationToken); + await Output.WriteDataAsync(data.Span, cancellationToken: cancellationToken); } } else @@ -892,18 +892,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } } - private Task WriteChunkedAsync(ArraySegment data, CancellationToken cancellationToken) + private Task WriteChunkedAsync(ReadOnlyMemory data, CancellationToken cancellationToken) { return Output.WriteAsync(_writeChunk, data); } - private static void WriteChunk(PipeWriter writableBuffer, ArraySegment buffer) + private static void WriteChunk(PipeWriter writableBuffer, ReadOnlyMemory buffer) { var writer = OutputWriter.Create(writableBuffer); - if (buffer.Count > 0) + if (buffer.Length > 0) { - ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Count); - writer.Write(new ReadOnlySpan(buffer.Array, buffer.Offset, buffer.Count)); + ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Length); + writer.Write(buffer.Span); ChunkWriter.WriteEndChunkBytes(ref writer); } } diff --git a/src/Kestrel.Core/Internal/Http/HttpRequestStream.cs b/src/Kestrel.Core/Internal/Http/HttpRequestStream.cs index 2f677704c3..4f325f4d38 100644 --- a/src/Kestrel.Core/Internal/Http/HttpRequestStream.cs +++ b/src/Kestrel.Core/Internal/Http/HttpRequestStream.cs @@ -3,11 +3,12 @@ using System; using System.IO; +using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Protocols; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { @@ -105,20 +106,25 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - var task = ValidateState(cancellationToken); - if (task != null) - { - return task; - } + ValidateState(cancellationToken); - return ReadAsyncInternal(buffer, offset, count, cancellationToken); + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); } - private async Task ReadAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken) +#if NETCOREAPP2_1 + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + ValidateState(cancellationToken); + + return ReadAsyncInternal(destination, cancellationToken); + } +#endif + + private async ValueTask ReadAsyncInternal(Memory buffer, CancellationToken cancellationToken) { try { - return await _body.ReadAsync(new ArraySegment(buffer, offset, count), cancellationToken); + return await _body.ReadAsync(buffer, cancellationToken); } catch (ConnectionAbortedException ex) { @@ -137,11 +143,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http throw new ArgumentException(CoreStrings.PositiveNumberRequired, nameof(bufferSize)); } - var task = ValidateState(cancellationToken); - if (task != null) - { - return task; - } + ValidateState(cancellationToken); return CopyToAsyncInternal(destination, cancellationToken); } @@ -193,24 +195,29 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } } - private Task ValidateState(CancellationToken cancellationToken) + private void ValidateState(CancellationToken cancellationToken) { switch (_state) { case HttpStreamState.Open: if (cancellationToken.IsCancellationRequested) { - return Task.FromCanceled(cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); } break; case HttpStreamState.Closed: throw new ObjectDisposedException(nameof(HttpRequestStream)); case HttpStreamState.Aborted: - return _error != null ? - Task.FromException(_error) : - Task.FromCanceled(new CancellationToken(true)); + if (_error != null) + { + ExceptionDispatchInfo.Capture(_error).Throw(); + } + else + { + throw new TaskCanceledException(); + } + break; } - return null; } } } diff --git a/src/Kestrel.Core/Internal/Http/HttpResponseStream.cs b/src/Kestrel.Core/Internal/Http/HttpResponseStream.cs index e1c44d645f..93cea42956 100644 --- a/src/Kestrel.Core/Internal/Http/HttpResponseStream.cs +++ b/src/Kestrel.Core/Internal/Http/HttpResponseStream.cs @@ -41,12 +41,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public override Task FlushAsync(CancellationToken cancellationToken) { - var task = ValidateState(cancellationToken); - if (task == null) - { - return _httpResponseControl.FlushAsync(cancellationToken); - } - return task; + ValidateState(cancellationToken); + + return _httpResponseControl.FlushAsync(cancellationToken); } public override long Seek(long offset, SeekOrigin origin) @@ -109,14 +106,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - var task = ValidateState(cancellationToken); - if (task == null) - { - return _httpResponseControl.WriteAsync(new ArraySegment(buffer, offset, count), cancellationToken); - } - return task; + ValidateState(cancellationToken); + + return _httpResponseControl.WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken); } +#if NETCOREAPP2_1 + public override Task WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + ValidateState(cancellationToken); + + return _httpResponseControl.WriteAsync(source, cancellationToken); + } +#endif + public void StartAcceptingWrites() { // Only start if not aborted @@ -147,14 +150,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http } } - private Task ValidateState(CancellationToken cancellationToken) + private void ValidateState(CancellationToken cancellationToken) { switch (_state) { case HttpStreamState.Open: if (cancellationToken.IsCancellationRequested) { - return Task.FromCanceled(cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); } break; case HttpStreamState.Closed: @@ -163,11 +166,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http if (cancellationToken.IsCancellationRequested) { // Aborted state only throws on write if cancellationToken requests it - return Task.FromCanceled(cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); } break; } - return null; } } } diff --git a/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs b/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs index 019fa9fe77..abc2b4454a 100644 --- a/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http/IHttpOutputProducer.cs @@ -15,7 +15,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http Task FlushAsync(CancellationToken cancellationToken); Task Write100ContinueAsync(CancellationToken cancellationToken); void WriteResponseHeaders(int statusCode, string ReasonPhrase, HttpResponseHeaders responseHeaders); - Task WriteDataAsync(ArraySegment data, CancellationToken cancellationToken); + // The reason this is ReadOnlySpan and not ReadOnlyMemory is because writes are always + // synchronous. Flushing to get back pressure is the only time we truly go async but + // that's after the buffer is copied + Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken); Task WriteStreamSuffixAsync(CancellationToken cancellationToken); } } diff --git a/src/Kestrel.Core/Internal/Http/IHttpResponseControl.cs b/src/Kestrel.Core/Internal/Http/IHttpResponseControl.cs index fac6b11c0b..9a42aa6116 100644 --- a/src/Kestrel.Core/Internal/Http/IHttpResponseControl.cs +++ b/src/Kestrel.Core/Internal/Http/IHttpResponseControl.cs @@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public interface IHttpResponseControl { void ProduceContinue(); - Task WriteAsync(ArraySegment data, CancellationToken cancellationToken); + Task WriteAsync(ReadOnlyMemory data, CancellationToken cancellationToken); Task FlushAsync(CancellationToken cancellationToken); } } diff --git a/src/Kestrel.Core/Internal/Http/MessageBody.cs b/src/Kestrel.Core/Internal/Http/MessageBody.cs index 5d05eacf43..ad330f2c92 100644 --- a/src/Kestrel.Core/Internal/Http/MessageBody.cs +++ b/src/Kestrel.Core/Internal/Http/MessageBody.cs @@ -35,7 +35,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http protected IKestrelTrace Log => _context.ServiceContext.Log; - public virtual async Task ReadAsync(ArraySegment buffer, CancellationToken cancellationToken = default(CancellationToken)) + public virtual async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken)) { TryInit(); @@ -50,10 +50,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http if (!readableBuffer.IsEmpty) { // buffer.Count is int - var actual = (int) Math.Min(readableBuffer.Length, buffer.Count); + var actual = (int) Math.Min(readableBuffer.Length, buffer.Length); var slice = readableBuffer.Slice(0, actual); consumed = readableBuffer.GetPosition(readableBuffer.Start, actual); - slice.CopyTo(buffer); + slice.CopyTo(buffer.Span); return actual; } else if (result.IsCompleted) @@ -84,8 +84,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { foreach (var memory in readableBuffer) { + // REVIEW: This *could* be slower if 2 things are true + // - The WriteAsync(ReadOnlyMemory) isn't overridden on the destination + // - We change the Kestrel Memory Pool to not use pinned arrays but instead use native memory +#if NETCOREAPP2_1 + await destination.WriteAsync(memory); +#else var array = memory.GetArray(); await destination.WriteAsync(array.Array, array.Offset, array.Count, cancellationToken); +#endif } } else if (result.IsCompleted) @@ -148,7 +155,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http public override bool IsEmpty => true; - public override Task ReadAsync(ArraySegment buffer, CancellationToken cancellationToken = default(CancellationToken)) => Task.FromResult(0); + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken)) => new ValueTask(0); public override Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken)) => Task.CompletedTask; diff --git a/src/Kestrel.Core/Internal/Http2/Http2Frame.cs b/src/Kestrel.Core/Internal/Http2/Http2Frame.cs index e0dbf8bd09..0d693c0ced 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Frame.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Frame.cs @@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly byte[] _data = new byte[HeaderLength + MinAllowedMaxFrameSize]; - public ArraySegment Raw => new ArraySegment(_data, 0, HeaderLength + Length); + public Span Raw => new Span(_data, 0, HeaderLength + Length); public int Length { diff --git a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs index 0540beb0fa..362bdc9759 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs @@ -100,10 +100,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } } - public Task WriteDataAsync(int streamId, Span data, CancellationToken cancellationToken) + public Task WriteDataAsync(int streamId, ReadOnlySpan data, CancellationToken cancellationToken) => WriteDataAsync(streamId, data, endStream: false, cancellationToken: cancellationToken); - public Task WriteDataAsync(int streamId, Span data, bool endStream, CancellationToken cancellationToken) + public Task WriteDataAsync(int streamId, ReadOnlySpan data, bool endStream, CancellationToken cancellationToken) { var tasks = new List(); @@ -162,7 +162,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } } - public Task WritePingAsync(Http2PingFrameFlags flags, Span payload) + public Task WritePingAsync(Http2PingFrameFlags flags, ReadOnlySpan payload) { lock (_writeLock) { @@ -182,7 +182,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } // Must be called with _writeLock - private void Append(ArraySegment data) + private void Append(ReadOnlySpan data) { if (_completed) { @@ -194,15 +194,20 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 } // Must be called with _writeLock - private async Task WriteAsync(ArraySegment data, CancellationToken cancellationToken = default(CancellationToken)) + private Task WriteAsync(ReadOnlySpan data, CancellationToken cancellationToken = default(CancellationToken)) { if (_completed) { - return; + return Task.CompletedTask; } _outputWriter.Write(data); - await _outputWriter.FlushAsync(cancellationToken); + return FlushAsync(_outputWriter, cancellationToken); + } + + private async Task FlushAsync(PipeWriter outputWriter, CancellationToken cancellationToken) + { + await outputWriter.FlushAsync(cancellationToken); } private static IEnumerable> EnumerateHeaders(IHeaderDictionary headers) diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs index d1aa31df72..571f5a32d2 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs @@ -39,7 +39,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 public Task Write100ContinueAsync(CancellationToken cancellationToken) => _frameWriter.Write100ContinueAsync(_streamId); - public Task WriteDataAsync(ArraySegment data, CancellationToken cancellationToken) + public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellationToken) { return _frameWriter.WriteDataAsync(_streamId, data, cancellationToken); } diff --git a/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs index 21a8192a44..aa7b23330b 100644 --- a/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs +++ b/src/Kestrel.Core/Internal/Http2/IHttp2FrameWriter.cs @@ -14,11 +14,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken)); Task Write100ContinueAsync(int streamId); void WriteResponseHeaders(int streamId, int statusCode, IHeaderDictionary headers); - Task WriteDataAsync(int streamId, Span data, CancellationToken cancellationToken); - Task WriteDataAsync(int streamId, Span data, bool endStream, CancellationToken cancellationToken); + Task WriteDataAsync(int streamId, ReadOnlySpan data, CancellationToken cancellationToken); + Task WriteDataAsync(int streamId, ReadOnlySpan data, bool endStream, CancellationToken cancellationToken); Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode); Task WriteSettingsAckAsync(); - Task WritePingAsync(Http2PingFrameFlags flags, Span payload); + Task WritePingAsync(Http2PingFrameFlags flags, ReadOnlySpan payload); Task WriteGoAwayAsync(int lastStreamId, Http2ErrorCode errorCode); } } diff --git a/test/Kestrel.Core.Tests/Http2ConnectionTests.cs b/test/Kestrel.Core.Tests/Http2ConnectionTests.cs index e88add0081..5c5a0c2925 100644 --- a/test/Kestrel.Core.Tests/Http2ConnectionTests.cs +++ b/test/Kestrel.Core.Tests/Http2ConnectionTests.cs @@ -2136,10 +2136,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests return Task.WhenAll(_runningStreams.Values.Select(tcs => tcs.Task)).TimeoutAfter(TestConstants.DefaultTimeout); } - private async Task SendAsync(ArraySegment span) + private Task SendAsync(ReadOnlySpan span) { var writableBuffer = _pair.Application.Output; writableBuffer.Write(span); + return FlushAsync(writableBuffer); + } + + private static async Task FlushAsync(PipeWriter writableBuffer) + { await writableBuffer.FlushAsync(); } diff --git a/test/Kestrel.Core.Tests/HttpRequestStreamTests.cs b/test/Kestrel.Core.Tests/HttpRequestStreamTests.cs index a4baec6303..98c3d7ef68 100644 --- a/test/Kestrel.Core.Tests/HttpRequestStreamTests.cs +++ b/test/Kestrel.Core.Tests/HttpRequestStreamTests.cs @@ -118,7 +118,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests var mockBodyControl = new Mock(); mockBodyControl.Setup(m => m.AllowSynchronousIO).Returns(() => allowSynchronousIO); var mockMessageBody = new Mock((HttpProtocol)null); - mockMessageBody.Setup(m => m.ReadAsync(It.IsAny>(), CancellationToken.None)).ReturnsAsync(0); + mockMessageBody.Setup(m => m.ReadAsync(It.IsAny>(), CancellationToken.None)).Returns(new ValueTask(0)); var stream = new HttpRequestStream(mockBodyControl.Object); stream.StartAcceptingReads(mockMessageBody.Object); @@ -136,25 +136,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests } [Fact] - public void AbortCausesReadToCancel() + public async Task AbortCausesReadToCancel() { var stream = new HttpRequestStream(Mock.Of()); stream.StartAcceptingReads(null); stream.Abort(); - var task = stream.ReadAsync(new byte[1], 0, 1); - Assert.True(task.IsCanceled); + await Assert.ThrowsAsync(() => stream.ReadAsync(new byte[1], 0, 1)); } [Fact] - public void AbortWithErrorCausesReadToCancel() + public async Task AbortWithErrorCausesReadToCancel() { var stream = new HttpRequestStream(Mock.Of()); stream.StartAcceptingReads(null); var error = new Exception(); stream.Abort(error); - var task = stream.ReadAsync(new byte[1], 0, 1); - Assert.True(task.IsFaulted); - Assert.Same(error, task.Exception.InnerException); + var exception = await Assert.ThrowsAsync(() => stream.ReadAsync(new byte[1], 0, 1)); + Assert.Same(error, exception); } [Fact] @@ -167,25 +165,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests } [Fact] - public void AbortCausesCopyToAsyncToCancel() + public async Task AbortCausesCopyToAsyncToCancel() { var stream = new HttpRequestStream(Mock.Of()); stream.StartAcceptingReads(null); stream.Abort(); - var task = stream.CopyToAsync(Mock.Of()); - Assert.True(task.IsCanceled); + await Assert.ThrowsAsync(() => stream.CopyToAsync(Mock.Of())); } [Fact] - public void AbortWithErrorCausesCopyToAsyncToCancel() + public async Task AbortWithErrorCausesCopyToAsyncToCancel() { var stream = new HttpRequestStream(Mock.Of()); stream.StartAcceptingReads(null); var error = new Exception(); stream.Abort(error); - var task = stream.CopyToAsync(Mock.Of()); - Assert.True(task.IsFaulted); - Assert.Same(error, task.Exception.InnerException); + var exception = await Assert.ThrowsAsync(() => stream.CopyToAsync(Mock.Of())); + Assert.Same(error, exception); } [Fact] diff --git a/test/Kestrel.Core.Tests/HttpResponseStreamTests.cs b/test/Kestrel.Core.Tests/HttpResponseStreamTests.cs index 06bd3436cd..5e49e8a5a2 100644 --- a/test/Kestrel.Core.Tests/HttpResponseStreamTests.cs +++ b/test/Kestrel.Core.Tests/HttpResponseStreamTests.cs @@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests var mockBodyControl = new Mock(); mockBodyControl.Setup(m => m.AllowSynchronousIO).Returns(() => allowSynchronousIO); var mockHttpResponseControl = new Mock(); - mockHttpResponseControl.Setup(m => m.WriteAsync(It.IsAny>(), CancellationToken.None)).Returns(Task.CompletedTask); + mockHttpResponseControl.Setup(m => m.WriteAsync(It.IsAny>(), CancellationToken.None)).Returns(Task.CompletedTask); var stream = new HttpResponseStream(mockBodyControl.Object, mockHttpResponseControl.Object); stream.StartAcceptingWrites(); diff --git a/test/Kestrel.Core.Tests/MessageBodyTests.cs b/test/Kestrel.Core.Tests/MessageBodyTests.cs index ad6ac1237e..41d0724bd0 100644 --- a/test/Kestrel.Core.Tests/MessageBodyTests.cs +++ b/test/Kestrel.Core.Tests/MessageBodyTests.cs @@ -426,7 +426,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests { var writeCount = 0; var writeTcs = new TaskCompletionSource(); - var mockDestination = new Mock(); + var mockDestination = new Mock() { CallBase = true }; mockDestination .Setup(m => m.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), CancellationToken.None)) @@ -595,7 +595,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests // Time out on the next read input.Http1Connection.SendTimeoutResponse(); - var exception = await Assert.ThrowsAsync(() => body.ReadAsync(new ArraySegment(new byte[1]))); + var exception = await Assert.ThrowsAsync(async () => await body.ReadAsync(new Memory(new byte[1]))); Assert.Equal(StatusCodes.Status408RequestTimeout, exception.StatusCode); await body.StopAsync(); diff --git a/test/Kestrel.Core.Tests/TestHelpers/MockHttpResponseControl.cs b/test/Kestrel.Core.Tests/TestHelpers/MockHttpResponseControl.cs index 2ff611f49e..738a070635 100644 --- a/test/Kestrel.Core.Tests/TestHelpers/MockHttpResponseControl.cs +++ b/test/Kestrel.Core.Tests/TestHelpers/MockHttpResponseControl.cs @@ -19,7 +19,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests.TestHelpers { } - public Task WriteAsync(ArraySegment data, CancellationToken cancellationToken) + public Task WriteAsync(ReadOnlyMemory data, CancellationToken cancellationToken) { return Task.CompletedTask; }