Refactoring response control flow

- Bring through both sync and async execution paths
- Remove callback pattern from from and socketoutput write calls
This commit is contained in:
Louis DeJardin 2015-09-03 19:50:57 -07:00
parent 1e39473047
commit 557f6d6993
9 changed files with 207 additions and 109 deletions

View File

@ -6,7 +6,9 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.Framework.Logging;
using Microsoft.Framework.Primitives;
@ -16,10 +18,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class Frame : FrameContext, IFrameControl
{
private static Encoding _ascii = Encoding.ASCII;
private static readonly Encoding _ascii = Encoding.ASCII;
private static readonly ArraySegment<byte> _endChunkBytes = CreateAsciiByteArraySegment("\r\n");
private static readonly ArraySegment<byte> _endChunkedResponseBytes = CreateAsciiByteArraySegment("0\r\n\r\n");
private static readonly ArraySegment<byte> _continueBytes = CreateAsciiByteArraySegment("HTTP/1.1 100 Continue\r\n\r\n");
private static readonly ArraySegment<byte> _emptyData = new ArraySegment<byte>(new byte[0]);
private static readonly byte[] _hex = Encoding.ASCII.GetBytes("0123456789abcdef");
private Mode _mode;
private bool _responseStarted;
@ -65,7 +69,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Consume()
{
var input = SocketInput;
for (; ;)
for (;;)
{
switch (_mode)
{
@ -244,7 +248,19 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
public void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state)
public void Flush()
{
ProduceStart(immediate: false);
SocketOutput.Write(_emptyData, immediate: true);
}
public Task FlushAsync(CancellationToken cancellationToken)
{
ProduceStart(immediate: false);
return SocketOutput.WriteAsync(_emptyData, immediate: true);
}
public void Write(ArraySegment<byte> data)
{
ProduceStart(immediate: false);
@ -252,63 +268,80 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
if (data.Count == 0)
{
callback(null, state);
return;
}
WriteChunkPrefix(data.Count);
WriteChunked(data);
}
else
{
SocketOutput.Write(data, immediate: true);
}
}
SocketOutput.Write(data, callback, state, immediate: !_autoChunk);
public Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
ProduceStart(immediate: false);
if (_autoChunk)
{
WriteChunkSuffix();
if (data.Count == 0)
{
return TaskUtilities.CompletedTask;
}
return WriteChunkedAsync(data, cancellationToken);
}
else
{
return SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
}
}
private void WriteChunkPrefix(int numOctets)
private void WriteChunked(ArraySegment<byte> data)
{
var numOctetBytes = CreateAsciiByteArraySegment(numOctets.ToString("x") + "\r\n");
SocketOutput.Write(numOctetBytes,
(error, _) =>
{
if (error != null)
{
Log.LogError("WriteChunkPrefix", error);
}
},
null,
immediate: false);
SocketOutput.Write(BeginChunkBytes(data.Count), immediate: false);
SocketOutput.Write(data, immediate: false);
SocketOutput.Write(_endChunkBytes, immediate: true);
}
private void WriteChunkSuffix()
private async Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
SocketOutput.Write(_endChunkBytes,
(error, _) =>
{
if (error != null)
{
Log.LogError("WriteChunkSuffix", error);
}
},
null,
immediate: true);
await SocketOutput.WriteAsync(BeginChunkBytes(data.Count), immediate: false, cancellationToken: cancellationToken);
await SocketOutput.WriteAsync(data, immediate: false, cancellationToken: cancellationToken);
await SocketOutput.WriteAsync(_endChunkBytes, immediate: true, cancellationToken: cancellationToken);
}
public static ArraySegment<byte> BeginChunkBytes(int dataCount)
{
var bytes = new byte[10]
{
_hex[((dataCount >> 0x1c) & 0x0f)],
_hex[((dataCount >> 0x18) & 0x0f)],
_hex[((dataCount >> 0x14) & 0x0f)],
_hex[((dataCount >> 0x10) & 0x0f)],
_hex[((dataCount >> 0x0c) & 0x0f)],
_hex[((dataCount >> 0x08) & 0x0f)],
_hex[((dataCount >> 0x04) & 0x0f)],
_hex[((dataCount >> 0x00) & 0x0f)],
(byte)'\r',
(byte)'\n',
};
// Determine the most-significant non-zero nibble
int total, shift;
total = (dataCount > 0xffff) ? 0x10 : 0x00;
dataCount >>= total;
shift = (dataCount > 0x00ff) ? 0x08 : 0x00;
dataCount >>= shift;
total |= shift;
total |= (dataCount > 0x000f) ? 0x04 : 0x00;
var offset = 7 - (total >> 2);
return new ArraySegment<byte>(bytes, offset, 10 - offset);
}
private void WriteChunkedResponseSuffix()
{
SocketOutput.Write(_endChunkedResponseBytes,
(error, _) =>
{
if (error != null)
{
Log.LogError("WriteChunkedResponseSuffix", error);
}
},
null,
immediate: true);
SocketOutput.Write(_endChunkedResponseBytes, immediate: true);
}
public void Upgrade(IDictionary<string, object> options, Func<object, Task> callback)
@ -332,16 +365,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
RequestHeaders.TryGetValue("Expect", out expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
SocketOutput.Write(
_continueBytes,
(error, _) =>
{
if (error != null)
{
Log.LogError("ProduceContinue ", error);
}
},
null);
SocketOutput.Write(_continueBytes);
}
}
@ -355,18 +379,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase);
var responseHeader = CreateResponseHeader(status, appCompleted, ResponseHeaders);
SocketOutput.Write(
responseHeader.Item1,
(error, state) =>
{
if (error != null)
{
Log.LogError("ProduceStart ", error);
}
((IDisposable)state).Dispose();
},
responseHeader.Item2,
immediate: immediate);
SocketOutput.Write(responseHeader.Item1, immediate: immediate);
responseHeader.Item2.Dispose();
}
public void ProduceEnd(Exception ex)
@ -660,6 +674,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
statusCode != 304;
}
private enum Mode
{
StartLine,

View File

@ -35,28 +35,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override void Flush()
{
FlushAsync(CancellationToken.None).Wait();
_context.FrameControl.Flush();
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_context.FrameControl.Write(
new ArraySegment<byte>(new byte[0]),
(error, arg) =>
{
var tcsArg = (TaskCompletionSource<int>)arg;
if (error != null)
{
tcsArg.SetException(error);
}
else
{
tcsArg.SetResult(0);
}
},
tcs);
return tcs.Task;
return _context.FrameControl.FlushAsync(cancellationToken);
}
public override long Seek(long offset, SeekOrigin origin)
@ -76,28 +60,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).Wait();
_context.FrameControl.Write(new ArraySegment<byte>(buffer, offset, count));
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_context.FrameControl.Write(
new ArraySegment<byte>(buffer, offset, count),
(error, arg) =>
{
var tcsArg = (TaskCompletionSource<int>)arg;
if (error != null)
{
tcsArg.SetException(error);
}
else
{
tcsArg.SetResult(0);
}
},
tcs);
return tcs.Task;
return _context.FrameControl.WriteAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
}
}
}

View File

@ -2,12 +2,17 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public interface IFrameControl
{
void ProduceContinue();
void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state);
void Write(ArraySegment<byte> data);
Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken);
void Flush();
Task FlushAsync(CancellationToken cancellationToken);
}
}
}

View File

@ -2,6 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
@ -10,6 +12,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
/// </summary>
public interface ISocketOutput
{
void Write(ArraySegment<byte> buffer, Action<Exception, object> callback, object state, bool immediate = true);
void Write(ArraySegment<byte> buffer, bool immediate = true);
Task WriteAsync(ArraySegment<byte> buffer, bool immediate = true, CancellationToken cancellationToken = default(CancellationToken));
}
}
}

View File

@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.AspNet.Server.Kestrel.Networking;
@ -96,11 +97,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private void ScheduleWrite()
{
_thread.Post(obj =>
{
var self = (SocketOutput)obj;
self.WriteAllPending();
}, this);
_thread.Post(_this => _this.WriteAllPending(), this);
}
// This is called on the libuv event loop
@ -211,6 +208,35 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}, context);
}
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
((ISocketOutput)this).WriteAsync(buffer, immediate).Wait();
}
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, CancellationToken cancellationToken)
{
// TODO: Optimize task being used, and remove callback model from the underlying Write
var tcs = new TaskCompletionSource<int>();
Write(
buffer,
(error, state) =>
{
if (error != null)
{
tcs.SetException(error);
}
else
{
tcs.SetResult(0);
}
},
tcs,
immediate: immediate);
return tcs.Task;
}
private class CallbackContext
{
public Exception Error;

View File

@ -0,0 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Infrastructure
{
public static class TaskUtilities
{
public static Task CompletedTask = NewCompletedTask();
private static Task NewCompletedTask()
{
var tcs = new TaskCompletionSource<int>();
tcs.SetResult(0);
return tcs.Task;
}
}
}

View File

@ -51,7 +51,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
continue;
}
#pragma warning disable CS0618
var value = Marshal.GetDelegateForFunctionPointer(procAddress, field.FieldType);
#pragma warning restore CS0618
field.SetValue(this, value);
}
}

View File

@ -0,0 +1,38 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNet.Server.Kestrel.Http;
using System.Linq;
using System.Text;
using Xunit;
namespace Microsoft.AspNet.Server.KestrelTests
{
public class FrameTests
{
[Theory]
[InlineData(1, "1\r\n")]
[InlineData(10, "a\r\n")]
[InlineData(0x08, "8\r\n")]
[InlineData(0x10, "10\r\n")]
[InlineData(0x080, "80\r\n")]
[InlineData(0x100, "100\r\n")]
[InlineData(0x0800, "800\r\n")]
[InlineData(0x1000, "1000\r\n")]
[InlineData(0x08000, "8000\r\n")]
[InlineData(0x10000, "10000\r\n")]
[InlineData(0x080000, "80000\r\n")]
[InlineData(0x100000, "100000\r\n")]
[InlineData(0x0800000, "800000\r\n")]
[InlineData(0x1000000, "1000000\r\n")]
[InlineData(0x08000000, "8000000\r\n")]
[InlineData(0x10000000, "10000000\r\n")]
[InlineData(0x7fffffffL, "7fffffff\r\n")]
public void ChunkedPrefixMustBeHexCrLfWithoutLeadingZeros(int dataCount, string expected)
{
var beginChunkBytes = Frame.BeginChunkBytes(dataCount);
Assert.Equal(Encoding.ASCII.GetBytes(expected), beginChunkBytes.ToArray());
}
}
}

View File

@ -2,6 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Http;
namespace Microsoft.AspNet.Server.KestrelTests
@ -53,6 +55,26 @@ namespace Microsoft.AspNet.Server.KestrelTests
public void End(ProduceEndType endType)
{
}
void IFrameControl.ProduceContinue()
{
}
void IFrameControl.Write(ArraySegment<byte> data)
{
}
async Task IFrameControl.WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
}
void IFrameControl.Flush()
{
}
async Task IFrameControl.FlushAsync(CancellationToken cancellationToken)
{
}
}
}