Changing flow control for incoming data

SocketInput is now awaitable
Input buffers are slab-allocated and pinned from large object heap
Request frame parsing is offloaded from libuv thread
This commit is contained in:
Louis DeJardin 2015-09-15 16:40:11 -07:00
parent 557f6d6993
commit 1f6aaebeda
25 changed files with 2823 additions and 413 deletions

View File

@ -16,7 +16,7 @@ namespace SampleApp
loggerFactory.AddConsole(LogLevel.Debug);
app.Run(context =>
app.Run(async context =>
{
Console.WriteLine("{0} {1}{2}{3}",
context.Request.Method,
@ -24,9 +24,11 @@ namespace SampleApp
context.Request.Path,
context.Request.QueryString);
await context.Request.Body.CopyToAsync(Console.OpenStandardOutput());
context.Response.ContentLength = 11;
context.Response.ContentType = "text/plain";
return context.Response.WriteAsync("Hello world");
await context.Response.WriteAsync("Hello world");
});
}
}

View File

@ -3,6 +3,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.AspNet.Server.Kestrel.Networking;
using Microsoft.Framework.Logging;
@ -11,13 +12,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class Connection : ConnectionContext, IConnectionControl
{
private static readonly Action<UvStreamHandle, int, Exception, object> _readCallback = ReadCallback;
private const int EOF = -4095;
private const int ECONNRESET = -4077;
private static readonly Action<UvStreamHandle, int, int, Exception, object> _readCallback = ReadCallback;
private static readonly Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback = AllocCallback;
private static long _lastConnectionId;
private readonly UvStreamHandle _socket;
private Frame _frame;
private Task _frameTask;
private long _connectionId = 0;
private readonly object _stateLock = new object();
@ -35,9 +40,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
Log.ConnectionStart(_connectionId);
SocketInput = new SocketInput(Memory);
SocketInput = new SocketInput(Memory2);
SocketOutput = new SocketOutput(Thread, _socket, _connectionId, Log);
_frame = new Frame(this);
_frameTask = Task.Run(_frame.ProcessFraming);
_socket.ReadStart(_allocCallback, _readCallback, this);
}
@ -48,27 +54,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
var result = SocketInput.Pin(2048);
return handle.Libuv.buf_init(
SocketInput.Pin(2048),
2048);
result.DataPtr,
result.Data.Count);
}
private static void ReadCallback(UvStreamHandle handle, int nread, Exception error, object state)
private static void ReadCallback(UvStreamHandle handle, int readCount, int errorCode, Exception error, object state)
{
((Connection)state).OnRead(handle, nread, error);
((Connection)state).OnRead(handle, readCount, errorCode, error);
}
private void OnRead(UvStreamHandle handle, int status, Exception error)
private void OnRead(UvStreamHandle handle, int readCount, int errorCode, Exception error)
{
SocketInput.Unpin(status);
SocketInput.Unpin(readCount);
var normalRead = error == null && status > 0;
var normalDone = status == 0 || status == Constants.ECONNRESET || status == Constants.EOF;
var normalRead = readCount != 0 && errorCode == 0;
var normalDone = readCount == 0 && (errorCode == 0 || errorCode == Constants.ECONNRESET || errorCode == Constants.EOF);
var errorDone = !(normalDone || normalRead);
if (normalRead)
{
Log.ConnectionRead(_connectionId, status);
Log.ConnectionRead(_connectionId, readCount);
}
else if (normalDone || errorDone)
{
@ -85,16 +93,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
try
{
_frame.Consume();
}
catch (Exception ex)
{
Log.LogError("Connection._frame.Consume ", ex);
throw;
}
SocketInput.SetCompleted(errorDone ? error : null);
}
void IConnectionControl.Pause()
@ -124,17 +123,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Log.ConnectionWriteFin(_connectionId);
Thread.Post(
state =>
_this =>
{
var self = (Connection)state;
var shutdown = new UvShutdownReq(self.Log);
shutdown.Init(self.Thread.Loop);
shutdown.Shutdown(self._socket, (req, status, state2) =>
shutdown.Init(_this.Thread.Loop);
shutdown.Shutdown(_this._socket, (req, status, state2) =>
{
var self2 = (Connection)state2;
self2.Log.ConnectionWroteFin(_connectionId, status);
var __this = (Connection)state2;
__this.Log.ConnectionWroteFin(__this._connectionId, status);
req.Dispose();
}, this);
}, _this);
},
this);
break;
@ -159,12 +158,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Log.ConnectionDisconnect(_connectionId);
Thread.Post(
state =>
_this =>
{
Log.ConnectionStop(_connectionId);
((UvHandle)state).Dispose();
Log.ConnectionStop(_this._connectionId);
_this._socket.Dispose();
},
_socket);
this);
break;
}
}

View File

@ -11,6 +11,8 @@ using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.Framework.Logging;
using Microsoft.Framework.Primitives;
using System.Numerics;
using Microsoft.AspNet.Hosting.Builder;
// ReSharper disable AccessToModifiedClosure
@ -25,24 +27,22 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private static readonly ArraySegment<byte> _emptyData = new ArraySegment<byte>(new byte[0]);
private static readonly byte[] _hex = Encoding.ASCII.GetBytes("0123456789abcdef");
private Mode _mode;
private bool _responseStarted;
private bool _keepAlive;
private bool _autoChunk;
private readonly object _onStartingSync = new Object();
private readonly object _onCompletedSync = new Object();
private readonly FrameRequestHeaders _requestHeaders = new FrameRequestHeaders();
private readonly FrameResponseHeaders _responseHeaders = new FrameResponseHeaders();
private List<KeyValuePair<Func<object, Task>, object>> _onStarting;
private List<KeyValuePair<Func<object, Task>, object>> _onCompleted;
private object _onStartingSync = new Object();
private object _onCompletedSync = new Object();
private bool _responseStarted;
private bool _keepAlive;
private bool _autoChunk;
public Frame(ConnectionContext context) : base(context)
{
FrameControl = this;
StatusCode = 200;
RequestHeaders = _requestHeaders;
ResponseHeaders = _responseHeaders;
Reset();
}
public string Method { get; set; }
@ -66,95 +66,112 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
get { return _responseStarted; }
}
public void Consume()
public void Reset()
{
var input = SocketInput;
for (;;)
{
switch (_mode)
{
case Mode.StartLine:
if (input.Buffer.Count == 0 && input.RemoteIntakeFin)
{
_mode = Mode.Terminated;
break;
}
_onStarting = null;
_onCompleted = null;
if (!TakeStartLine(input))
{
if (input.RemoteIntakeFin)
{
_mode = Mode.Terminated;
break;
}
return;
}
_responseStarted = false;
_keepAlive = false;
_autoChunk = false;
_mode = Mode.MessageHeader;
break;
_requestHeaders.Reset();
ResetResponseHeaders();
case Mode.MessageHeader:
if (input.Buffer.Count == 0 && input.RemoteIntakeFin)
{
_mode = Mode.Terminated;
break;
}
Method = null;
RequestUri = null;
Path = null;
QueryString = null;
HttpVersion = null;
RequestHeaders = _requestHeaders;
MessageBody = null;
RequestBody = null;
StatusCode = 200;
ReasonPhrase = null;
ResponseHeaders = _responseHeaders;
ResponseBody = null;
DuplexStream = null;
var endOfHeaders = false;
while (!endOfHeaders)
{
if (!TakeMessageHeader(input, out endOfHeaders))
{
if (input.RemoteIntakeFin)
{
_mode = Mode.Terminated;
break;
}
return;
}
}
if (_mode == Mode.Terminated)
{
// If we broke out of the above while loop in the Terminated
// state, we don't want to transition to the MessageBody state.
break;
}
_mode = Mode.MessageBody;
Execute();
break;
case Mode.MessageBody:
if (MessageBody.LocalIntakeFin)
{
// NOTE: stop reading and resume on keepalive?
return;
}
MessageBody.Consume();
// NOTE: keep looping?
return;
case Mode.Terminated:
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
ConnectionControl.End(ProduceEndType.SocketDisconnect);
return;
}
}
}
private void Execute()
public void ResetResponseHeaders()
{
MessageBody = MessageBody.For(
HttpVersion,
RequestHeaders,
this);
_keepAlive = MessageBody.RequestKeepAlive;
RequestBody = new FrameRequestStream(MessageBody);
ResponseBody = new FrameResponseStream(this);
DuplexStream = new FrameDuplexStream(RequestBody, ResponseBody);
SocketInput.Free();
Task.Run(ExecuteAsync);
_responseHeaders.Reset();
_responseHeaders.HeaderServer = "Kestrel";
_responseHeaders.HeaderDate = DateTime.UtcNow.ToString("r");
}
public async Task ProcessFraming()
{
var terminated = false;
while (!terminated)
{
while (!terminated && !TakeStartLine(SocketInput))
{
terminated = SocketInput.RemoteIntakeFin;
if (!terminated)
{
await SocketInput;
}
else
{
var x = 5;
}
}
while (!terminated && !TakeMessageHeader2(SocketInput))
{
terminated = SocketInput.RemoteIntakeFin;
if (!terminated)
{
await SocketInput;
}
else
{
var x = 5;
}
}
if (!terminated)
{
MessageBody = MessageBody.For(HttpVersion, _requestHeaders, this);
_keepAlive = MessageBody.RequestKeepAlive;
RequestBody = new FrameRequestStream(MessageBody);
ResponseBody = new FrameResponseStream(this);
DuplexStream = new FrameDuplexStream(RequestBody, ResponseBody);
Exception error = null;
try
{
await Application.Invoke(this).ConfigureAwait(false);
// Trigger FireOnStarting if ProduceStart hasn't been called yet.
// We call it here, so it can go through our normal error handling
// and respond with a 500 if an OnStarting callback throws.
if (!_responseStarted)
{
FireOnStarting();
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
FireOnCompleted();
ProduceEnd(error);
}
terminated = !_keepAlive;
}
Reset();
}
// Connection Terminated!
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
ConnectionControl.End(ProduceEndType.SocketDisconnect);
}
public void OnStarting(Func<object, Task> callback, object state)
@ -222,32 +239,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
private async Task ExecuteAsync()
{
Exception error = null;
try
{
await Application.Invoke(this).ConfigureAwait(false);
// Trigger FireOnStarting if ProduceStart hasn't been called yet.
// We call it here, so it can go through our normal error handling
// and respond with a 500 if an OnStarting callback throws.
if (!_responseStarted)
{
FireOnStarting();
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
FireOnCompleted();
ProduceEnd(error);
}
}
public void Flush()
{
ProduceStart(immediate: false);
@ -402,8 +393,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
// the app func has failed. https://github.com/aspnet/KestrelHttpServer/issues/43
_onStarting = null;
ResponseHeaders = new FrameResponseHeaders();
ResponseHeaders["Content-Length"] = new[] { "0" };
ResetResponseHeaders();
_responseHeaders.HeaderContentLength = "0";
}
}
@ -522,58 +513,39 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return new Tuple<ArraySegment<byte>, IDisposable>(writer.Buffer, writer);
}
private bool TakeStartLine(SocketInput baton)
private bool TakeStartLine(SocketInput input)
{
var remaining = baton.Buffer;
if (remaining.Count < 2)
var begin = input.GetIterator();
if (begin.IsDefault) return false;
var end = begin.IndexOf(' ');
var method = begin.GetString(end);
char chFound;
begin = end.Add(1);
end = begin.IndexOfAny(' ', '?', out chFound);
var requestUri = begin.GetString(end);
begin = end;
end = chFound == '?' ? begin.IndexOf(' ') : begin;
var queryString = begin.GetString(end);
begin = end.Add(1);
end = begin.IndexOf('\r');
var httpVersion = begin.GetString(end);
end = end.Add(1);
if (end.Peek() != '\n')
{
return false;
}
var firstSpace = -1;
var secondSpace = -1;
var questionMark = -1;
var ch0 = remaining.Array[remaining.Offset];
for (var index = 0; index != remaining.Count - 1; ++index)
{
var ch1 = remaining.Array[remaining.Offset + index + 1];
if (ch0 == '\r' && ch1 == '\n')
{
if (secondSpace == -1)
{
throw new InvalidOperationException("INVALID REQUEST FORMAT");
}
Method = GetString(remaining, 0, firstSpace);
RequestUri = GetString(remaining, firstSpace + 1, secondSpace);
if (questionMark == -1)
{
Path = RequestUri;
QueryString = string.Empty;
}
else
{
Path = GetString(remaining, firstSpace + 1, questionMark);
QueryString = GetString(remaining, questionMark, secondSpace);
}
HttpVersion = GetString(remaining, secondSpace + 1, index);
baton.Skip(index + 2);
return true;
}
if (ch0 == ' ' && firstSpace == -1)
{
firstSpace = index;
}
else if (ch0 == ' ' && firstSpace != -1 && secondSpace == -1)
{
secondSpace = index;
}
else if (ch0 == '?' && firstSpace != -1 && questionMark == -1 && secondSpace == -1)
{
questionMark = index;
}
ch0 = ch1;
}
return false;
Method = method;
RequestUri = requestUri;
QueryString = queryString;
HttpVersion = httpVersion;
input.JumpTo(end.Add(1));
return true;
}
static string GetString(ArraySegment<byte> range, int startIndex, int endIndex)
@ -581,6 +553,68 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return Encoding.UTF8.GetString(range.Array, range.Offset + startIndex, endIndex - startIndex);
}
private bool TakeMessageHeader2(SocketInput baton)
{
char chFirst;
char chSecond;
var scan = baton.GetIterator();
while (!scan.IsDefault)
{
var beginName = scan;
scan = scan.IndexOfAny(':', '\r', out chFirst);
var endName = scan;
chSecond = scan.MoveNext();
if (chFirst == '\r' && chSecond == '\n')
{
baton.JumpTo(scan.Add(1));
return true;
}
if (chFirst == char.MinValue)
{
return false;
}
while (
chSecond == ' ' ||
chSecond == '\t' ||
chSecond == '\r' ||
chSecond == '\n')
{
chSecond = scan.MoveNext();
}
var beginValue = scan;
var wrapping = false;
while (!scan.IsDefault)
{
var endValue = scan = scan.IndexOf('\r');
chFirst = scan.MoveNext();
if (chFirst != '\n')
{
continue;
}
chSecond = scan.MoveNext();
if (chSecond == ' ' || chSecond == '\t')
{
wrapping = true;
continue;
}
var name = beginName.GetArraySegment(endName);
var value = beginValue.GetString(endValue);
if (wrapping)
{
value = value.Replace("\r\n", " ");
}
_requestHeaders.Append(name.Array, name.Offset, name.Count, value);
break;
}
}
return false;
}
private bool TakeMessageHeader(SocketInput baton, out bool endOfHeaders)
{
@ -673,14 +707,5 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
statusCode != 205 &&
statusCode != 304;
}
private enum Mode
{
StartLine,
MessageHeader,
MessageBody,
Terminated,
}
}
}

View File

@ -21,7 +21,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
return GetValueFast(key);
}
set
{
SetValueFast(key, value);
@ -36,6 +35,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
ICollection<StringValues> IDictionary<string, StringValues>.Values => ((IDictionary<string, StringValues>)this).Select(pair => pair.Value).ToList();
public void Reset()
{
ClearFast();
}
protected static StringValues AppendValue(StringValues existing, string append)
{
return StringValues.Concat(existing, append);

View File

@ -72,13 +72,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count));
return _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
}
public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<int>(state);
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count));
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
task.ContinueWith((task2, state2) =>
{
var tcs2 = (TaskCompletionSource<int>)state2;

View File

@ -15,6 +15,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
protected Listener(ServiceContext serviceContext) : base(serviceContext)
{
Memory2 = new MemoryPool2();
}
protected UvStreamHandle ListenSocket { get; private set; }

View File

@ -22,6 +22,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Thread = listenerContext.Thread;
Application = listenerContext.Application;
Memory = listenerContext.Memory;
Memory2 = listenerContext.Memory2;
Log = listenerContext.Log;
}
@ -31,6 +32,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public IMemoryPool Memory { get; set; }
public MemoryPool2 Memory2 { get; set; }
public IKestrelTrace Log { get; }
}
}

View File

@ -59,7 +59,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
DispatchPipe.ReadStart(
(_1, _2, _3) => buf,
(_1, status2, error2, state2) =>
(_1, status2, errCode, error2, state2) =>
{
if (status2 < 0)
{

View File

@ -0,0 +1,109 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class MemoryPool2 : IDisposable
{
private const int blockStride = 4096;
private const int blockUnused = 64;
private const int blockCount = 32;
private const int blockLength = blockStride - blockUnused;
private const int slabLength = blockStride * blockCount;
private ConcurrentStack<MemoryPoolBlock2> _blocks = new ConcurrentStack<MemoryPoolBlock2>();
private ConcurrentStack<MemoryPoolSlab2> _slabs = new ConcurrentStack<MemoryPoolSlab2>();
public MemoryPoolBlock2 Lease(int minimumSize)
{
if (minimumSize > blockLength)
{
return MemoryPoolBlock2.Create(
new ArraySegment<byte>(new byte[minimumSize]),
dataPtr: IntPtr.Zero,
pool: null,
slab: null);
}
for (;;)
{
MemoryPoolBlock2 block;
if (_blocks.TryPop(out block))
{
return block;
}
AllocateSlab();
}
}
private void AllocateSlab()
{
var slab = MemoryPoolSlab2.Create(slabLength);
_slabs.Push(slab);
var basePtr = slab.ArrayPtr;
var firstOffset = (blockStride - 1) - ((ushort)(basePtr + blockStride - 1) % blockStride);
for (var offset = firstOffset;
offset + blockLength <= slabLength;
offset += blockStride)
{
var block = MemoryPoolBlock2.Create(
new ArraySegment<byte>(slab.Array, offset, blockLength),
basePtr,
this,
slab);
Return(block);
}
}
public void Return(MemoryPoolBlock2 block)
{
block.Reset();
_blocks.Push(block);
}
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
MemoryPoolSlab2 slab;
while (_slabs.TryPop(out slab))
{
// dispose managed state (managed objects).
slab.Dispose();
}
}
// N/A: free unmanaged resources (unmanaged objects) and override a finalizer below.
// N/A: set large fields to null.
disposedValue = true;
}
}
// N/A: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~MemoryPool2() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// N/A: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
}

View File

@ -0,0 +1,479 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Numerics;
using System.Runtime.InteropServices;
using System.Text;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class MemoryPoolBlock2
{
private static Vector<byte> _dotIndex = new Vector<byte>(Enumerable.Range(0, Vector<byte>.Count).Select(x => (byte)-x).ToArray());
private static Vector<byte> _dotCount = new Vector<byte>(Byte.MaxValue);
private GCHandle _pinHandle;
private IntPtr _dataArrayPtr;
public ArraySegment<byte> Data;
protected MemoryPoolBlock2()
{
}
public MemoryPool2 Pool { get; private set; }
public MemoryPoolSlab2 Slab { get; private set; }
public byte[] Array => Data.Array;
public int Start { get; set; }
public int End { get; set; }
public MemoryPoolBlock2 Next { get; set; }
~MemoryPoolBlock2()
{
if (_pinHandle.IsAllocated)
{
_pinHandle.Free();
}
if (Slab != null && Slab.IsActive)
{
Pool.Return(new MemoryPoolBlock2
{
_dataArrayPtr = _dataArrayPtr,
Data = Data,
Pool = Pool,
Slab = Slab,
});
}
}
public IntPtr Pin()
{
Debug.Assert(!_pinHandle.IsAllocated);
if (_dataArrayPtr != IntPtr.Zero)
{
return _dataArrayPtr + End;
}
else
{
_pinHandle = GCHandle.Alloc(Data.Array, GCHandleType.Pinned);
return _pinHandle.AddrOfPinnedObject() + End;
}
}
public void Unpin()
{
if (_dataArrayPtr == IntPtr.Zero)
{
Debug.Assert(_pinHandle.IsAllocated);
_pinHandle.Free();
}
}
public static MemoryPoolBlock2 Create(int size, MemoryPool2 pool)
{
return new MemoryPoolBlock2
{
Data = new ArraySegment<byte>(new byte[size]),
Pool = pool
};
}
public static MemoryPoolBlock2 Create(
ArraySegment<byte> data,
IntPtr dataPtr,
MemoryPool2 pool,
MemoryPoolSlab2 slab)
{
return new MemoryPoolBlock2
{
Data = data,
_dataArrayPtr = dataPtr,
Pool = pool,
Slab = slab,
Start = data.Offset,
End = data.Offset,
};
}
public void Reset()
{
Next = null;
Start = Data.Offset;
End = Data.Offset;
}
public override string ToString()
{
return Encoding.ASCII.GetString(Array, Start, End - Start);
}
public Iterator GetIterator()
{
return new Iterator(this);
}
public struct Iterator
{
private MemoryPoolBlock2 _block;
private int _index;
public Iterator(MemoryPoolBlock2 block)
{
_block = block;
_index = _block?.Start ?? 0;
}
public Iterator(MemoryPoolBlock2 block, int index)
{
_block = block;
_index = index;
}
public bool IsDefault => _block == null;
public MemoryPoolBlock2 Block => _block;
public int Index => _index;
public bool HasAtLeast(int count)
{
var scan = _block;
var index = _index;
while (scan != null)
{
if (count <= scan.End - index)
{
return true;
}
count -= scan.End - index;
scan = scan.Next;
index = scan?.Start ?? 0;
}
return false;
}
public Iterator Add(int count)
{
var block = _block;
var index = _index;
while (block != null)
{
var tailCount = block.End - index;
if (count < tailCount)
{
return new Iterator(block, index + count);
}
count -= tailCount;
block = block.Next;
index = block?.Start ?? 0;
}
return new Iterator(block, index + count);
}
public Iterator CopyTo(byte[] array, int offset, int count, out int actual)
{
var block = _block;
var index = _index;
var remaining = count;
while (block != null && remaining != 0)
{
var copyLength = Math.Min(remaining, block.End - index);
Buffer.BlockCopy(block.Array, index, array, offset, copyLength);
index += copyLength;
offset += copyLength;
remaining -= copyLength;
if (index == block.End)
{
block = block.Next;
index = block?.Start ?? 0;
}
}
actual = count - remaining;
return new Iterator(block, index);
}
public char MoveNext()
{
var block = _block;
var index = _index;
while (block != null && index == block.End)
{
block = block.Next;
index = block?.Start ?? 0;
}
if (block != null)
{
++index;
}
while (block != null && index == block.End)
{
block = block.Next;
index = block?.Start ?? 0;
}
_block = block;
_index = index;
return block != null ? (char)block.Array[index] : char.MinValue;
}
public int Peek()
{
while (_block != null)
{
if (_index < _block.End)
{
return _block.Data.Array[_index];
}
_block = _block.Next;
_index = _block.Start;
}
return -1;
}
public Iterator IndexOf(char char0)
{
var byte0 = (byte)char0;
var vectorStride = Vector<byte>.Count;
var ch0Vector = new Vector<byte>(byte0);
var scanBlock = _block;
var scanArray = scanBlock?.Array;
var scanIndex = _index;
while (scanBlock != null)
{
var tailCount = scanBlock.End - scanIndex;
if (tailCount == 0)
{
scanBlock = scanBlock.Next;
scanArray = scanBlock?.Array;
scanIndex = scanBlock?.Start ?? 0;
continue;
}
if (tailCount >= vectorStride)
{
var data = new Vector<byte>(scanBlock.Array, scanIndex);
var ch0Equals = Vector.Equals(data, ch0Vector);
var ch0Count = Vector.Dot(ch0Equals, _dotCount);
if (ch0Count == 0)
{
scanIndex += vectorStride;
continue;
}
else if (ch0Count == 1)
{
return new Iterator(scanBlock, scanIndex + Vector.Dot(ch0Equals, _dotIndex));
}
else
{
tailCount = vectorStride;
}
}
for (; tailCount != 0; tailCount--, scanIndex++)
{
var ch = scanBlock.Array[scanIndex];
if (ch == byte0)
{
return new Iterator(scanBlock, scanIndex);
}
}
}
return new Iterator(null, 0);
}
public Iterator IndexOfAny(char char0, char char1, out char chFound)
{
var byte0 = (byte)char0;
var byte1 = (byte)char1;
var vectorStride = Vector<byte>.Count;
var ch0Vector = new Vector<byte>(byte0);
var ch1Vector = new Vector<byte>(byte1);
var scanBlock = _block;
var scanArray = scanBlock?.Array;
var scanIndex = _index;
while (scanBlock != null)
{
var tailCount = scanBlock.End - scanIndex;
if (tailCount == 0)
{
scanBlock = scanBlock.Next;
scanArray = scanBlock?.Array;
scanIndex = scanBlock?.Start ?? 0;
continue;
}
if (tailCount >= vectorStride)
{
var data = new Vector<byte>(scanBlock.Array, scanIndex);
var ch0Equals = Vector.Equals(data, ch0Vector);
var ch0Count = Vector.Dot(ch0Equals, _dotCount);
var ch1Equals = Vector.Equals(data, ch1Vector);
var ch1Count = Vector.Dot(ch1Equals, _dotCount);
if (ch0Count == 0 && ch1Count == 0)
{
scanIndex += vectorStride;
continue;
}
else if (ch0Count < 2 && ch1Count < 2)
{
var ch0Index = ch0Count == 1 ? Vector.Dot(ch0Equals, _dotIndex) : byte.MaxValue;
var ch1Index = ch1Count == 1 ? Vector.Dot(ch1Equals, _dotIndex) : byte.MaxValue;
if (ch0Index < ch1Index)
{
chFound = char0;
return new Iterator(scanBlock, scanIndex + ch0Index);
}
else
{
chFound = char1;
return new Iterator(scanBlock, scanIndex + ch1Index);
}
}
else
{
tailCount = vectorStride;
}
}
for (; tailCount != 0; tailCount--, scanIndex++)
{
var chIndex = scanBlock.Array[scanIndex];
if (chIndex == byte0)
{
chFound = char0;
return new Iterator(scanBlock, scanIndex);
}
else if (chIndex == byte1)
{
chFound = char1;
return new Iterator(scanBlock, scanIndex);
}
}
}
chFound = char.MinValue;
return new Iterator(null, 0);
}
public int GetLength(Iterator end)
{
var length = 0;
var block = _block;
var index = _index;
for (;;)
{
if (block == end._block)
{
return length + end._index - index;
}
if (block == null)
{
throw new Exception("end was not after iterator");
}
length += block.End - index;
block = block.Next;
index = block?.Start ?? 0;
}
}
public string GetString(Iterator end)
{
if (IsDefault || end.IsDefault)
{
return default(string);
}
if (end._block == _block)
{
return Encoding.ASCII.GetString(_block.Array, _index, end._index - _index);
}
if (end._block == _block.Next && end._index == end._block.Start)
{
return Encoding.ASCII.GetString(_block.Array, _index, _block.End - _index);
}
var length = GetLength(end);
var result = new char[length];
var offset = 0;
var decoder = Encoding.ASCII.GetDecoder();
var block = _block;
var index = _index;
while (length != 0)
{
if (block == null)
{
throw new Exception("Unexpected end of data");
}
var count = Math.Min(block.End - index, length);
int bytesUsed;
int textAdded;
bool completed;
decoder.Convert(
block.Array,
index,
count,
result,
offset,
length,
count == length,
out bytesUsed,
out textAdded,
out completed);
Debug.Assert(bytesUsed == count);
Debug.Assert(textAdded == count);
offset += count;
length -= count;
block = block.Next;
index = block?.Start ?? 0;
}
return new string(result);
}
public ArraySegment<byte> GetArraySegment(Iterator end)
{
if (IsDefault || end.IsDefault)
{
return default(ArraySegment<byte>);
}
if (end._block == _block)
{
return new ArraySegment<byte>(_block.Array, _index, end._index - _index);
}
if (end._block == _block.Next && end._index == end._block.Start)
{
return new ArraySegment<byte>(_block.Array, _index, _block.End - _index);
}
var length = GetLength(end);
var result = new ArraySegment<byte>(new byte[length]);
var offset = result.Offset;
var block = _block;
var index = _index;
while (length != 0)
{
if (block == null)
{
throw new Exception("Unexpected end of data");
}
var count = Math.Min(block.End - index, length);
Buffer.BlockCopy(block.Array, index, result.Array, offset, count);
offset += count;
length -= count;
block = block.Next;
index = block?.Start ?? 0;
}
return result;
}
}
}
}

View File

@ -0,0 +1,68 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class MemoryPoolSlab2 : IDisposable
{
private GCHandle _gcHandle;
public byte[] Array;
public IntPtr ArrayPtr;
public bool IsActive;
public static MemoryPoolSlab2 Create(int length)
{
var array = new byte[length];
var gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);
return new MemoryPoolSlab2
{
Array = array,
_gcHandle = gcHandle,
ArrayPtr = gcHandle.AddrOfPinnedObject(),
IsActive = true,
};
}
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// N/A: dispose managed state (managed objects).
}
// free unmanaged resources (unmanaged objects) and override a finalizer below.
IsActive = false;
_gcHandle.Free();
// set large fields to null.
Array = null;
disposedValue = true;
}
}
// override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
~MemoryPoolSlab2()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(false);
}
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// uncomment the following line if the finalizer is overridden above.
GC.SuppressFinalize(this);
}
#endregion
}
}

View File

@ -3,6 +3,8 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Framework.Primitives;
namespace Microsoft.AspNet.Server.Kestrel.Http
@ -15,18 +17,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public bool RequestKeepAlive { get; protected set; }
public void Intake(int count)
{
Transfer(count, false);
}
public void IntakeFin(int count)
{
Transfer(count, true);
}
public abstract void Consume();
public static MessageBody For(
string httpVersion,
IDictionary<string, StringValues> headers,
@ -93,48 +83,49 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
}
public override void Consume()
public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
var input = _context.SocketInput;
if (input.RemoteIntakeFin)
{
IntakeFin(input.Buffer.Count);
}
else
{
Intake(input.Buffer.Count);
}
await input;
var begin = input.GetIterator();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, buffer.Count, out actual);
input.JumpTo(end);
return actual;
}
}
class ForContentLength : MessageBody
{
private readonly int _contentLength;
private int _neededLength;
private int _inputLength;
public ForContentLength(bool keepAlive, int contentLength, FrameContext context)
: base(context)
{
RequestKeepAlive = keepAlive;
_contentLength = contentLength;
_neededLength = _contentLength;
_inputLength = _contentLength;
}
public override void Consume()
public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
var input = _context.SocketInput;
var consumeLength = Math.Min(_neededLength, input.Buffer.Count);
_neededLength -= consumeLength;
if (_neededLength != 0)
var limit = Math.Min(buffer.Count, _inputLength);
if (limit != 0)
{
Intake(consumeLength);
}
else
{
IntakeFin(consumeLength);
await input;
}
var begin = input.GetIterator();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, limit, out actual);
_inputLength -= actual;
input.JumpTo(end);
return actual;
}
}
@ -144,9 +135,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
/// </summary>
class ForChunkedEncoding : MessageBody
{
private int _neededLength;
private int _inputLength;
private Mode _mode = Mode.ChunkSizeLine;
private Mode _mode = Mode.ChunkPrefix;
public ForChunkedEncoding(bool keepAlive, FrameContext context)
: base(context)
@ -154,80 +145,83 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
RequestKeepAlive = keepAlive;
}
public override void Consume()
public override async Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
var input = _context.SocketInput;
for (; ;)
while (_mode != Mode.Complete)
{
switch (_mode)
while (_mode == Mode.ChunkPrefix)
{
case Mode.ChunkSizeLine:
var chunkSize = 0;
if (!TakeChunkedLine(input, ref chunkSize))
{
return;
}
_neededLength = chunkSize;
if (chunkSize == 0)
{
_mode = Mode.Complete;
IntakeFin(0);
return;
}
var chunkSize = 0;
if (!TakeChunkedLine(input, ref chunkSize))
{
await input;
}
else if (chunkSize == 0)
{
_mode = Mode.Complete;
}
else
{
_mode = Mode.ChunkData;
break;
}
_inputLength = chunkSize;
}
while (_mode == Mode.ChunkData)
{
var limit = Math.Min(buffer.Count, _inputLength);
if (limit != 0)
{
await input;
}
case Mode.ChunkData:
if (_neededLength == 0)
{
_mode = Mode.ChunkDataCRLF;
break;
}
if (input.Buffer.Count == 0)
{
return;
}
var begin = input.GetIterator();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, limit, out actual);
_inputLength -= actual;
input.JumpTo(end);
var consumeLength = Math.Min(_neededLength, input.Buffer.Count);
_neededLength -= consumeLength;
if (_inputLength == 0)
{
_mode = Mode.ChunkSuffix;
}
Intake(consumeLength);
break;
case Mode.ChunkDataCRLF:
if (input.Buffer.Count < 2)
{
return;
}
var crlf = input.Take(2);
if (crlf.Array[crlf.Offset] != '\r' ||
crlf.Array[crlf.Offset + 1] != '\n')
{
throw new NotImplementedException("INVALID REQUEST FORMAT");
}
_mode = Mode.ChunkSizeLine;
break;
default:
return actual;
}
while (_mode == Mode.ChunkSuffix)
{
var begin = input.GetIterator();
var ch1 = begin.Peek();
var ch2 = begin.MoveNext();
if (ch1 == char.MinValue || ch2 == char.MinValue)
{
await input;
}
else if (ch1 == '\r' && ch2 == '\n')
{
input.JumpTo(begin.Add(1));
_mode = Mode.ChunkPrefix;
}
else
{
throw new NotImplementedException("INVALID REQUEST FORMAT");
}
}
}
return 0;
}
private static bool TakeChunkedLine(SocketInput baton, ref int chunkSizeOut)
{
var remaining = baton.Buffer;
if (remaining.Count < 2)
{
return false;
}
var ch0 = remaining.Array[remaining.Offset];
var remaining = baton.GetIterator();
var ch0 = remaining.Peek();
var chunkSize = 0;
var mode = 0;
for (var index = 0; index != remaining.Count - 1; ++index)
while(ch0 != -1)
{
var ch1 = remaining.Array[remaining.Offset + index + 1];
var ch1 = remaining.MoveNext();
if (mode == 0)
{
@ -269,7 +263,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
else if (ch0 == '\r' && ch1 == '\n')
{
baton.Skip(index + 2);
baton.JumpTo(remaining.Add(1));
chunkSizeOut = chunkSize;
return true;
}
@ -282,7 +276,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
if (ch0 == '\r' && ch1 == '\n')
{
baton.Skip(index + 2);
baton.JumpTo(remaining.Add(1));
chunkSizeOut = chunkSize;
return true;
}
@ -299,9 +293,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private enum Mode
{
ChunkSizeLine,
ChunkPrefix,
ChunkData,
ChunkDataCRLF,
ChunkSuffix,
Complete,
};
}

View File

@ -119,6 +119,31 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return result;
}
public Task<int> ReadAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
Task<int> result = null;
var send100Continue = false;
result = ReadAsyncImplementation(buffer, cancellationToken);
if (!result.IsCompleted)
{
lock (_sync)
{
send100Continue = _send100Continue;
_send100Continue = false;
}
}
if (send100Continue)
{
_context.FrameControl.ProduceContinue();
}
return result;
}
public virtual Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
throw new NotImplementedException("TODO");
}
static void CompletePending(object state)
{
while (((MessageBodyExchanger)state).CompletePending())

View File

@ -3,25 +3,45 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class SocketInput
public class SocketInput : ICriticalNotifyCompletion
{
private readonly IMemoryPool _memory;
private GCHandle _gcHandle;
private static readonly Action _awaitableIsCompleted = () => { };
private static readonly Action _awaitableIsNotCompleted = () => { };
public SocketInput(IMemoryPool memory)
private readonly MemoryPool2 _memory;
private Action _awaitableState;
private Exception _awaitableError;
private MemoryPoolBlock2 _head;
private MemoryPoolBlock2 _tail;
private MemoryPoolBlock2 _pinned;
private readonly object _syncHeadAndTail = new Object();
public SocketInput(MemoryPool2 memory)
{
_memory = memory;
Buffer = new ArraySegment<byte>(_memory.Empty, 0, 0);
_awaitableState = _awaitableIsNotCompleted;
}
public ArraySegment<byte> Buffer { get; set; }
public bool RemoteIntakeFin { get; set; }
public bool IsCompleted
{
get
{
return Equals(_awaitableState, _awaitableIsCompleted);
}
}
public void Skip(int count)
{
@ -35,80 +55,183 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return taken;
}
public void Free()
public PinResult Pin(int minimumSize)
{
if (Buffer.Count == 0 && Buffer.Array.Length != 0)
lock (_syncHeadAndTail)
{
_memory.FreeByte(Buffer.Array);
Buffer = new ArraySegment<byte>(_memory.Empty, 0, 0);
}
}
public ArraySegment<byte> Available(int minimumSize)
{
if (Buffer.Count == 0 && Buffer.Offset != 0)
{
Buffer = new ArraySegment<byte>(Buffer.Array, 0, 0);
}
var availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count;
if (availableSize < minimumSize)
{
if (availableSize + Buffer.Offset >= minimumSize)
if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
{
Array.Copy(Buffer.Array, Buffer.Offset, Buffer.Array, 0, Buffer.Count);
if (Buffer.Count != 0)
_pinned = _tail;
var data = new ArraySegment<byte>(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End);
var dataPtr = _pinned.Pin();
return new PinResult
{
Buffer = new ArraySegment<byte>(Buffer.Array, 0, Buffer.Count);
}
availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count;
}
else
{
var largerSize = Buffer.Array.Length + Math.Max(Buffer.Array.Length, minimumSize);
var larger = new ArraySegment<byte>(_memory.AllocByte(largerSize), 0, Buffer.Count);
if (Buffer.Count != 0)
{
Array.Copy(Buffer.Array, Buffer.Offset, larger.Array, 0, Buffer.Count);
}
_memory.FreeByte(Buffer.Array);
Buffer = larger;
availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count;
Data = data,
DataPtr = dataPtr,
};
}
}
return new ArraySegment<byte>(Buffer.Array, Buffer.Offset + Buffer.Count, availableSize);
}
public void Extend(int count)
{
Debug.Assert(count >= 0);
Debug.Assert(Buffer.Offset >= 0);
Debug.Assert(Buffer.Offset <= Buffer.Array.Length);
Debug.Assert(Buffer.Offset + Buffer.Count <= Buffer.Array.Length);
Debug.Assert(Buffer.Offset + Buffer.Count + count <= Buffer.Array.Length);
Buffer = new ArraySegment<byte>(Buffer.Array, Buffer.Offset, Buffer.Count + count);
}
public IntPtr Pin(int minimumSize)
{
var segment = Available(minimumSize);
_gcHandle = GCHandle.Alloc(segment.Array, GCHandleType.Pinned);
return _gcHandle.AddrOfPinnedObject() + segment.Offset;
_pinned = _memory.Lease(minimumSize);
return new PinResult
{
Data = _pinned.Data,
DataPtr = _pinned.Pin()
};
}
public void Unpin(int count)
{
// read_cb may called without an earlier alloc_cb
// this does not need to be thread-safe
// IsAllocated is checked only because Unpin can be called redundantly
if (_gcHandle.IsAllocated)
// Unpin may called without an earlier Pin
if (_pinned != null)
{
_gcHandle.Free();
Extend(count);
lock (_syncHeadAndTail)
{
_pinned.End += count;
if (_head == null)
{
_head = _tail = _pinned;
}
else if (_tail == _pinned)
{
// NO-OP: this was a read into unoccupied tail-space
}
else
{
_tail.Next = _pinned;
_tail = _pinned;
}
}
_pinned = null;
}
}
public SocketInput GetAwaiter()
{
return this;
}
public void OnCompleted(Action continuation)
{
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
continuation,
_awaitableIsNotCompleted);
if (awaitableState == _awaitableIsNotCompleted)
{
return;
}
else if (awaitableState == _awaitableIsCompleted)
{
Task.Run(continuation);
}
else
{
// THIS IS AN ERROR STATE - ONLY ONE WAITER CAN WAIT
}
}
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}
public void SetCompleted(Exception error)
{
if (error != null)
{
_awaitableError = error;
}
var awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);
if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
Task.Run(awaitableState);
}
}
public void SetNotCompleted()
{
if (RemoteIntakeFin || _awaitableError != null)
{
// TODO: Race condition - setting either of these can leave awaitable not completed
return;
}
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
if (awaitableState == _awaitableIsNotCompleted)
{
return;
}
else if (awaitableState == _awaitableIsCompleted)
{
return;
}
else
{
// THIS IS AN ERROR STATE - ONLY ONE WAITER MAY EXIST
}
}
public void GetResult()
{
var error = _awaitableError;
if (error != null)
{
throw new AggregateException(error);
}
}
public MemoryPoolBlock2.Iterator GetIterator()
{
lock (_syncHeadAndTail)
{
return new MemoryPoolBlock2.Iterator(_head);
}
}
public void JumpTo(MemoryPoolBlock2.Iterator iterator)
{
MemoryPoolBlock2 returnStart;
MemoryPoolBlock2 returnEnd;
lock (_syncHeadAndTail)
{
// TODO: leave _pinned intact
returnStart = _head;
returnEnd = iterator.Block;
_head = iterator.Block;
if (_head == null)
{
_tail = null;
SetNotCompleted();
}
else
{
_head.Start = iterator.Index;
}
}
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool.Return(returnBlock);
}
}
public struct PinResult
{
public ArraySegment<byte> Data;
public IntPtr DataPtr;
}
}
}

View File

@ -19,7 +19,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
private GCHandle _listenVitality;
public Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback;
public Action<UvStreamHandle, int, Exception, object> _readCallback;
public Action<UvStreamHandle, int, int, Exception, object> _readCallback;
public object _readState;
private GCHandle _readVitality;
@ -72,7 +72,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
public void ReadStart(
Func<UvStreamHandle, int, object, Libuv.uv_buf_t> allocCallback,
Action<UvStreamHandle, int, Exception, object> readCallback,
Action<UvStreamHandle, int, int, Exception, object> readCallback,
object state)
{
if (_readVitality.IsAllocated)
@ -163,11 +163,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
{
Exception error;
stream._uv.Check(nread, out error);
stream._readCallback(stream, 0, error, stream._readState);
stream._readCallback(stream, 0, nread, error, stream._readState);
}
else
{
stream._readCallback(stream, nread, null, stream._readState);
stream._readCallback(stream, nread, 0, null, stream._readState);
}
}
catch (Exception ex)

View File

@ -9,6 +9,7 @@
"Microsoft.AspNet.Hosting": "1.0.0-*",
"Microsoft.Dnx.Runtime.Abstractions": "1.0.0-*",
"Microsoft.Framework.Logging.Abstractions": "1.0.0-*",
"System.Numerics.Vectors": "4.1.1-beta-*",
"Microsoft.StandardsPolice": {
"version": "1.0.0-*",
"type": "build"

View File

@ -14,7 +14,8 @@ namespace Microsoft.AspNet.Server.KestrelTests
[Fact]
public void InitialDictionaryContainsServerAndDate()
{
IDictionary<string, StringValues> headers = new FrameResponseHeaders();
var frame = new Frame(new ConnectionContext());
IDictionary<string, StringValues> headers = frame.ResponseHeaders;
Assert.Equal(2, headers.Count);

View File

@ -0,0 +1,76 @@
using Microsoft.AspNet.Server.Kestrel.Http;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestrelTests
{
public class MemoryPoolBlock2Tests
{
[Fact]
public void IndexOfAnyWorks()
{
using (var pool = new MemoryPool2())
{
var block = pool.Lease(256);
foreach (var ch in Enumerable.Range(0, 256).Select(x => (byte)x))
{
block.Array[block.End++] = ch;
}
var iterator = block.GetIterator();
foreach (var ch in Enumerable.Range(0, 256).Select(x => (char)x))
{
var hit = iterator.IndexOf(ch);
Assert.Equal(ch, iterator.GetLength(hit));
}
}
}
[Fact]
public void GetLengthBetweenIteratorsWorks()
{
using (var pool = new MemoryPool2())
{
var block = pool.Lease(256);
block.End += 256;
TestAllLengths(block, 256);
pool.Return(block);
block = null;
for (var fragment = 0; fragment != 256; fragment += 4)
{
var next = block;
block = pool.Lease(4);
block.Next = next;
block.End += 4;
}
TestAllLengths(block, 256);
while(block != null)
{
var next = block.Next;
pool.Return(block);
block = next;
}
}
}
private void TestAllLengths(MemoryPoolBlock2 block, int lengths)
{
for (var firstIndex = 0; firstIndex <= lengths; ++firstIndex)
{
for (var lastIndex = firstIndex; lastIndex <= lengths; ++lastIndex)
{
var first = block.GetIterator().Add(firstIndex);
var last = block.GetIterator().Add(lastIndex);
Assert.Equal(firstIndex, block.GetIterator().GetLength(first));
Assert.Equal(lastIndex, block.GetIterator().GetLength(last));
Assert.Equal(lastIndex - firstIndex, first.GetLength(last));
}
}
}
}
}

View File

@ -18,7 +18,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
@ -54,7 +54,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
@ -84,7 +84,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
@ -117,7 +117,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
@ -155,7 +155,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);

View File

@ -24,7 +24,6 @@ namespace Microsoft.AspNet.Server.KestrelTests
var stream = new FrameRequestStream(body);
input.Add("Hello", true);
body.Consume();
var buffer1 = new byte[1024];
var count1 = stream.Read(buffer1, 0, 1024);
@ -43,7 +42,6 @@ namespace Microsoft.AspNet.Server.KestrelTests
var stream = new FrameRequestStream(body);
input.Add("Hello", true);
body.Consume();
var buffer1 = new byte[1024];
var count1 = await stream.ReadAsync(buffer1, 0, 1024);

View File

@ -112,7 +112,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
connect.Dispose();
clientConnectionPipe.ReadStart(
(_3, cb, _4) => buf,
(_3, status2, error2, _4) =>
(_3, status2, errCode, error2, _4) =>
{
if (status2 == 0)
{
@ -211,7 +211,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
clientConnectionPipe.ReadStart(
(_3, cb, _4) => buf,
(_3, status2, error2, _4) =>
(_3, status2, errCode2, error2, _4) =>
{
if (status2 == 0)
{
@ -224,7 +224,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var buf2 = loop2.Libuv.buf_init(Marshal.AllocHGlobal(64), 64);
clientConnectionTcp.ReadStart(
(_5, cb, _6) => buf2,
(_5, status3, error3, _6) =>
(_5, status3, errCode3, error3, _6) =>
{
if (status3 == 0)
{

View File

@ -141,7 +141,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart(
(a, b, c) => _uv.buf_init(data, 500),
(__, nread, error2, state2) =>
(__, nread, errCode, error2, state2) =>
{
bytesRead += nread;
if (nread == 0)
@ -197,7 +197,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var data = Marshal.AllocCoTaskMem(500);
tcp2.ReadStart(
(a, b, c) => tcp2.Libuv.buf_init(data, 500),
(__, nread, error2, state2) =>
(__, nread, errCode, error2, state2) =>
{
bytesRead += nread;
if (nread == 0)

View File

@ -13,9 +13,10 @@ namespace Microsoft.AspNet.Server.KestrelTests
public TestInput()
{
var memory = new MemoryPool();
var memory2 = new MemoryPool2();
FrameContext = new FrameContext
{
SocketInput = new SocketInput(memory),
SocketInput = new SocketInput(memory2),
Memory = memory,
ConnectionControl = this,
FrameControl = this
@ -28,9 +29,10 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
var encoding = System.Text.Encoding.ASCII;
var count = encoding.GetByteCount(text);
var buffer = FrameContext.SocketInput.Available(text.Length);
count = encoding.GetBytes(text, 0, text.Length, buffer.Array, buffer.Offset);
FrameContext.SocketInput.Extend(count);
var buffer = FrameContext.SocketInput.Pin(text.Length);
count = encoding.GetBytes(text, 0, text.Length, buffer.Data.Array, buffer.Data.Offset);
FrameContext.SocketInput.Unpin(count);
FrameContext.SocketInput.SetCompleted(null);
if (fin)
{
FrameContext.SocketInput.RemoteIntakeFin = true;

View File

@ -171,18 +171,6 @@ using Microsoft.Framework.Primitives;
namespace Microsoft.AspNet.Server.Kestrel.Http
{{
public partial class FrameResponseHeaders
{{
public FrameResponseHeaders()
{{
_Server = ""Kestrel"";
_Date = DateTime.UtcNow.ToString(""r"");
_bits = {
1L << responseHeaders.First(header => header.Name == "Server").Index |
1L << responseHeaders.First(header => header.Name == "Date").Index
}L;
}}
}}
{Each(loops, loop => $@"
public partial class {loop.ClassName}
{{
@ -190,6 +178,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{Each(loop.Headers, header => @"
private StringValues _" + header.Identifier + ";")}
{Each(loop.Headers, header => $@"
public StringValues Header{header.Identifier}
{{
get
{{
if ({header.TestBit()})
{{
return _{header.Identifier};
}}
else
{{
return StringValues.Empty;
}}
}}
set
{{
{header.SetBit()};
_{header.Identifier} = value;
}}
}}
")}
protected override int GetCountFast()
{{
var count = MaybeUnknown?.Count ?? 0;
@ -322,6 +332,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
protected override void ClearFast()
{{
_bits = 0;
{Each(loop.Headers, header => $@"
_{header.Identifier} = StringValues.Empty;")}
MaybeUnknown?.Clear();
}}