From f7ac457b40410edd688a180e46a5ec2da3502606 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Fri, 11 Dec 2015 06:42:06 +0000 Subject: [PATCH] Use ValueTask --- .../Filter/LibuvStream.cs | 28 +++++++++-- .../Http/FrameRequestStream.cs | 8 ++-- .../Http/MessageBody.cs | 47 ++++++++++++++----- .../Http/SocketInputExtensions.cs | 28 ++++++++++- .../project.json | 3 +- 5 files changed, 94 insertions(+), 20 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs index b4e92b2d76..1e019c8381 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs @@ -11,9 +11,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter { public class LibuvStream : Stream { + private readonly static Task _initialCachedTask = Task.FromResult(0); + private readonly SocketInput _input; private readonly ISocketOutput _output; + private Task _cachedTask = _initialCachedTask; + public LibuvStream(SocketInput input, ISocketOutput output) { _input = input; @@ -58,12 +62,30 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter public override int Read(byte[] buffer, int offset, int count) { - return ReadAsync(new ArraySegment(buffer, offset, count)).GetAwaiter().GetResult(); + // ValueTask uses .GetAwaiter().GetResult() if necessary + // https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156 + return ReadAsync(new ArraySegment(buffer, offset, count)).Result; } public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - return ReadAsync(new ArraySegment(buffer, offset, count)); + var task = ReadAsync(new ArraySegment(buffer, offset, count)); + + if (task.IsCompletedSuccessfully) + { + if (_cachedTask.Result != task.Result) + { + // Needs .AsTask to match Stream's Async method return types + _cachedTask = task.AsTask(); + } + } + else + { + // Needs .AsTask to match Stream's Async method return types + _cachedTask = task.AsTask(); + } + + return _cachedTask; } public override void Write(byte[] buffer, int offset, int count) @@ -99,7 +121,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter // No-op since writes are immediate. } - private Task ReadAsync(ArraySegment buffer) + private ValueTask ReadAsync(ArraySegment buffer) { return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count); } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs index f9ff5a4197..ba0c2062f7 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs @@ -53,7 +53,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { ValidateState(); - return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + // ValueTask uses .GetAwaiter().GetResult() if necessary + return ReadAsync(buffer, offset, count).Result; } #if NET451 @@ -80,7 +81,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http var tcs = new TaskCompletionSource(state); var task = _body.ReadAsync(new ArraySegment(buffer, offset, count), cancellationToken); - task.ContinueWith((task2, state2) => + task.AsTask().ContinueWith((task2, state2) => { var tcs2 = (TaskCompletionSource)state2; if (task2.IsCanceled) @@ -104,7 +105,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { ValidateState(); - return _body.ReadAsync(new ArraySegment(buffer, offset, count), cancellationToken); + // Needs .AsTask to match Stream's Async method return types + return _body.ReadAsync(new ArraySegment(buffer, offset, count), cancellationToken).AsTask(); } public override void Write(byte[] buffer, int offset, int count) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 8edd424a10..46b573758e 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -22,11 +22,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public bool RequestKeepAlive { get; protected set; } - public Task ReadAsync(ArraySegment buffer, CancellationToken cancellationToken = default(CancellationToken)) + public ValueTask ReadAsync(ArraySegment buffer, CancellationToken cancellationToken = default(CancellationToken)) { - Task result = null; var send100Continue = 0; - result = ReadAsyncImplementation(buffer, cancellationToken); + var result = ReadAsyncImplementation(buffer, cancellationToken); if (!result.IsCompleted) { send100Continue = Interlocked.Exchange(ref _send100Continue, 0); @@ -40,7 +39,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public async Task Consume(CancellationToken cancellationToken = default(CancellationToken)) { - Task result; + ValueTask result; var send100checked = false; do { @@ -56,7 +55,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http send100checked = true; } } - else if (result.GetAwaiter().GetResult() == 0) + // ValueTask uses .GetAwaiter().GetResult() if necessary + else if (result.Result == 0) { // Completed Task, end of stream return; @@ -69,7 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } while (await result != 0); } - public abstract Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); + public abstract ValueTask ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); public static MessageBody For( string httpVersion, @@ -137,7 +137,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { } - public override Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) + public override ValueTask ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) { return _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, buffer.Array == null ? 8192 : buffer.Count); } @@ -156,7 +156,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http _inputLength = _contentLength; } - public override async Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) + public override ValueTask ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) { var input = _context.SocketInput; @@ -166,9 +166,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http return 0; } - var actual = await _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, limit); - _inputLength -= actual; + var task = _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, limit); + if (task.IsCompleted) + { + // .GetAwaiter().GetResult() done by ValueTask if needed + var actual = task.Result; + _inputLength -= actual; + if (actual == 0) + { + throw new InvalidDataException("Unexpected end of request content"); + } + return actual; + } + else + { + return ReadAsyncAwaited(task.AsTask()); + } + } + + private async Task ReadAsyncAwaited(Task task) + { + var actual = await task; + _inputLength -= actual; if (actual == 0) { throw new InvalidDataException("Unexpected end of request content"); @@ -178,7 +198,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } - /// /// http://tools.ietf.org/html/rfc2616#section-3.6.1 /// @@ -193,8 +212,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { RequestKeepAlive = keepAlive; } + public override ValueTask ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) + { + return ReadAsyncAwaited(buffer, cancellationToken); + } - public override async Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) + private async Task ReadAsyncAwaited(ArraySegment buffer, CancellationToken cancellationToken) { var input = _context.SocketInput; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs index c36d88155d..39cb3202a0 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs @@ -8,7 +8,33 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public static class SocketInputExtensions { - public static async Task ReadAsync(this SocketInput input, byte[] buffer, int offset, int count) + public static ValueTask ReadAsync(this SocketInput input, byte[] buffer, int offset, int count) + { + while (true) + { + if (!input.IsCompleted) + { + return input.ReadAsyncAwaited(buffer, offset, count); + } + + var begin = input.ConsumingStart(); + + int actual; + var end = begin.CopyTo(buffer, offset, count, out actual); + input.ConsumingComplete(end, end); + + if (actual != 0) + { + return actual; + } + if (input.RemoteIntakeFin) + { + return 0; + } + } + } + + private static async Task ReadAsyncAwaited(this SocketInput input, byte[] buffer, int offset, int count) { while (true) { diff --git a/src/Microsoft.AspNet.Server.Kestrel/project.json b/src/Microsoft.AspNet.Server.Kestrel/project.json index c95d40a6dc..2e0caab7c0 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/project.json +++ b/src/Microsoft.AspNet.Server.Kestrel/project.json @@ -20,7 +20,8 @@ "Microsoft.AspNet.Internal.libuv-Windows": { "version": "1.0.0-*", "type": "build" - } + }, + "System.Threading.Tasks.Extensions": "4.0.0-*" }, "frameworks": { "dnx451": { },