Merge branch 'benaadams/valuetask' into dev

This commit is contained in:
Stephen Halter 2015-12-30 16:51:48 -08:00
commit 072f2b1a20
5 changed files with 94 additions and 20 deletions

View File

@ -11,9 +11,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
{
public class LibuvStream : Stream
{
private readonly static Task<int> _initialCachedTask = Task.FromResult(0);
private readonly SocketInput _input;
private readonly ISocketOutput _output;
private Task<int> _cachedTask = _initialCachedTask;
public LibuvStream(SocketInput input, ISocketOutput output)
{
_input = input;
@ -58,12 +62,30 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(new ArraySegment<byte>(buffer, offset, count)).GetAwaiter().GetResult();
// ValueTask uses .GetAwaiter().GetResult() if necessary
// https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156
return ReadAsync(new ArraySegment<byte>(buffer, offset, count)).Result;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsync(new ArraySegment<byte>(buffer, offset, count));
var task = ReadAsync(new ArraySegment<byte>(buffer, offset, count));
if (task.IsCompletedSuccessfully)
{
if (_cachedTask.Result != task.Result)
{
// Needs .AsTask to match Stream's Async method return types
_cachedTask = task.AsTask();
}
}
else
{
// Needs .AsTask to match Stream's Async method return types
_cachedTask = task.AsTask();
}
return _cachedTask;
}
public override void Write(byte[] buffer, int offset, int count)
@ -99,7 +121,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Filter
// No-op since writes are immediate.
}
private Task<int> ReadAsync(ArraySegment<byte> buffer)
private ValueTask<int> ReadAsync(ArraySegment<byte> buffer)
{
return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count);
}

View File

@ -53,7 +53,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
ValidateState();
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
// ValueTask uses .GetAwaiter().GetResult() if necessary
return ReadAsync(buffer, offset, count).Result;
}
#if NET451
@ -80,7 +81,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
var tcs = new TaskCompletionSource<int>(state);
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
task.ContinueWith((task2, state2) =>
task.AsTask().ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;
if (task2.IsCanceled)
@ -104,7 +105,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
ValidateState();
return _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
// Needs .AsTask to match Stream's Async method return types
return _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken).AsTask();
}
public override void Write(byte[] buffer, int offset, int count)

View File

@ -22,11 +22,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public bool RequestKeepAlive { get; protected set; }
public Task<int> ReadAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
public ValueTask<int> ReadAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
Task<int> result = null;
var send100Continue = 0;
result = ReadAsyncImplementation(buffer, cancellationToken);
var result = ReadAsyncImplementation(buffer, cancellationToken);
if (!result.IsCompleted)
{
send100Continue = Interlocked.Exchange(ref _send100Continue, 0);
@ -40,7 +39,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public async Task Consume(CancellationToken cancellationToken = default(CancellationToken))
{
Task<int> result;
ValueTask<int> result;
var send100checked = false;
do
{
@ -56,7 +55,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
send100checked = true;
}
}
else if (result.GetAwaiter().GetResult() == 0)
// ValueTask uses .GetAwaiter().GetResult() if necessary
else if (result.Result == 0)
{
// Completed Task, end of stream
return;
@ -69,7 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
} while (await result != 0);
}
public abstract Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken);
public abstract ValueTask<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken);
public static MessageBody For(
string httpVersion,
@ -137,7 +137,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
}
public override Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
public override ValueTask<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
return _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, buffer.Array == null ? 8192 : buffer.Count);
}
@ -156,7 +156,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_inputLength = _contentLength;
}
public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
public override ValueTask<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
var input = _context.SocketInput;
@ -166,9 +166,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return 0;
}
var actual = await _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, limit);
_inputLength -= actual;
var task = _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, limit);
if (task.IsCompleted)
{
// .GetAwaiter().GetResult() done by ValueTask if needed
var actual = task.Result;
_inputLength -= actual;
if (actual == 0)
{
throw new InvalidDataException("Unexpected end of request content");
}
return actual;
}
else
{
return ReadAsyncAwaited(task.AsTask());
}
}
private async Task<int> ReadAsyncAwaited(Task<int> task)
{
var actual = await task;
_inputLength -= actual;
if (actual == 0)
{
throw new InvalidDataException("Unexpected end of request content");
@ -178,7 +198,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
/// <summary>
/// http://tools.ietf.org/html/rfc2616#section-3.6.1
/// </summary>
@ -193,8 +212,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
RequestKeepAlive = keepAlive;
}
public override ValueTask<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
return ReadAsyncAwaited(buffer, cancellationToken);
}
public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
private async Task<int> ReadAsyncAwaited(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
var input = _context.SocketInput;

View File

@ -8,7 +8,33 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public static class SocketInputExtensions
{
public static async Task<int> ReadAsync(this SocketInput input, byte[] buffer, int offset, int count)
public static ValueTask<int> ReadAsync(this SocketInput input, byte[] buffer, int offset, int count)
{
while (true)
{
if (!input.IsCompleted)
{
return input.ReadAsyncAwaited(buffer, offset, count);
}
var begin = input.ConsumingStart();
int actual;
var end = begin.CopyTo(buffer, offset, count, out actual);
input.ConsumingComplete(end, end);
if (actual != 0)
{
return actual;
}
if (input.RemoteIntakeFin)
{
return 0;
}
}
}
private static async Task<int> ReadAsyncAwaited(this SocketInput input, byte[] buffer, int offset, int count)
{
while (true)
{

View File

@ -20,7 +20,8 @@
"Microsoft.AspNet.Internal.libuv-Windows": {
"version": "1.0.0-*",
"type": "build"
}
},
"System.Threading.Tasks.Extensions": "4.0.0-*"
},
"frameworks": {
"dnx451": { },