Removing MessageBodyExchanger base class

This commit is contained in:
Louis DeJardin 2015-09-17 18:17:26 -07:00
parent 7917569466
commit d3a87c4c14
9 changed files with 256 additions and 604 deletions

View File

@ -54,7 +54,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
var result = SocketInput.Pin(2048);
var result = SocketInput.IncomingStart(2048);
return handle.Libuv.buf_init(
result.DataPtr,
@ -68,8 +68,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private void OnRead(UvStreamHandle handle, int readCount, int errorCode, Exception error)
{
SocketInput.Unpin(readCount);
var normalRead = readCount != 0 && errorCode == 0;
var normalDone = readCount == 0 && (errorCode == 0 || errorCode == Constants.ECONNRESET || errorCode == Constants.EOF);
var errorDone = !(normalDone || normalRead);
@ -80,12 +78,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
else if (normalDone || errorDone)
{
SocketInput.RemoteIntakeFin = true;
_socket.ReadStop();
Log.ConnectionReadFin(_connectionId);
}
SocketInput.SetCompleted(errorDone ? error : null);
SocketInput.IncomingComplete(readCount, errorDone ? error : null);
}
void IConnectionControl.Pause()

View File

@ -505,52 +505,62 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private bool TakeStartLine(SocketInput input)
{
var scan = input.GetIterator();
var begin = scan;
if (scan.Seek(' ') == -1)
var scan = input.ConsumingStart();
var consumed = scan;
try
{
return false;
}
var method = begin.GetString(scan);
scan.Take();
begin = scan;
var chFound = scan.Seek(' ', '?');
if (chFound == -1)
{
return false;
}
var requestUri = begin.GetString(scan);
var queryString = "";
if (chFound == '?')
{
begin = scan;
if (scan.Seek(' ') != ' ')
var begin = scan;
if (scan.Seek(' ') == -1)
{
return false;
}
queryString = begin.GetString(scan);
}
var method = begin.GetString(scan);
scan.Take();
begin = scan;
if (scan.Seek('\r') == -1)
scan.Take();
begin = scan;
var chFound = scan.Seek(' ', '?');
if (chFound == -1)
{
return false;
}
var requestUri = begin.GetString(scan);
var queryString = "";
if (chFound == '?')
{
begin = scan;
if (scan.Seek(' ') != ' ')
{
return false;
}
queryString = begin.GetString(scan);
}
scan.Take();
begin = scan;
if (scan.Seek('\r') == -1)
{
return false;
}
var httpVersion = begin.GetString(scan);
scan.Take();
if (scan.Take() != '\n')
{
return false;
}
consumed = scan;
Method = method;
RequestUri = requestUri;
QueryString = queryString;
HttpVersion = httpVersion;
return true;
}
finally
{
return false;
input.ConsumingComplete(consumed, scan);
}
var httpVersion = begin.GetString(scan);
scan.Take();
if (scan.Take() != '\n') return false;
Method = method;
RequestUri = requestUri;
QueryString = queryString;
HttpVersion = httpVersion;
input.JumpTo(scan);
return true;
}
static string GetString(ArraySegment<byte> range, int startIndex, int endIndex)
@ -560,12 +570,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private bool TakeMessageHeaders(SocketInput input)
{
int chFirst;
int chSecond;
var scan = input.GetIterator();
var scan = input.ConsumingStart();
var consumed = scan;
try
{
int chFirst;
int chSecond;
while (!scan.IsEnd)
{
var beginName = scan;
@ -642,8 +652,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
value = value.Replace("\r\n", " ");
}
_requestHeaders.Append(name.Array, name.Offset, name.Count, value);
consumed = scan;
_requestHeaders.Append(name.Array, name.Offset, name.Count, value);
break;
}
}
@ -651,7 +661,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
finally
{
input.JumpTo(consumed);
input.ConsumingComplete(consumed, scan);
}
}

View File

@ -9,14 +9,36 @@ using Microsoft.Framework.Primitives;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
public abstract class MessageBody : MessageBodyExchanger
public abstract class MessageBody
{
protected MessageBody(FrameContext context) : base(context)
private FrameContext _context;
private int _send100Continue = 1;
protected MessageBody(FrameContext context)
{
_context = context;
}
public bool RequestKeepAlive { get; protected set; }
public Task<int> ReadAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
Task<int> result = null;
var send100Continue = 0;
result = ReadAsyncImplementation(buffer, cancellationToken);
if (!result.IsCompleted)
{
send100Continue = Interlocked.Exchange(ref _send100Continue, 0);
}
if (send100Continue == 1)
{
_context.FrameControl.ProduceContinue();
}
return result;
}
public abstract Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken);
public static MessageBody For(
string httpVersion,
IDictionary<string, StringValues> headers,
@ -90,10 +112,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
await input;
var begin = input.GetIterator();
var begin = input.ConsumingStart();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, buffer.Count, out actual);
input.JumpTo(end);
input.ConsumingComplete(end, end);
if (actual != 0)
{
@ -134,11 +156,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
await input;
var begin = input.GetIterator();
var begin = input.ConsumingStart();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, limit, out actual);
_inputLength -= actual;
input.JumpTo(end);
input.ConsumingComplete(end, end);
if (actual != 0)
{
return actual;
@ -198,11 +220,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
await input;
}
var begin = input.GetIterator();
var begin = input.ConsumingStart();
int actual;
var end = begin.CopyTo(buffer.Array, buffer.Offset, limit, out actual);
_inputLength -= actual;
input.JumpTo(end);
input.ConsumingComplete(end, end);
if (_inputLength == 0)
{
@ -215,16 +237,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
while (_mode == Mode.ChunkSuffix)
{
var scan = input.GetIterator();
var scan = input.ConsumingStart();
var consumed = scan;
var ch1 = scan.Take();
var ch2 = scan.Take();
if (ch1 == -1 || ch2 == -1)
{
input.ConsumingComplete(consumed, scan);
await input;
}
else if (ch1 == '\r' && ch2 == '\n')
{
input.JumpTo(scan);
input.ConsumingComplete(scan, scan);
_mode = Mode.ChunkPrefix;
}
else
@ -239,84 +263,92 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private static bool TakeChunkedLine(SocketInput baton, ref int chunkSizeOut)
{
var scan = baton.GetIterator();
var ch0 = scan.Take();
var chunkSize = 0;
var mode = 0;
while (ch0 != -1)
var scan = baton.ConsumingStart();
var consumed = scan;
try
{
var ch1 = scan.Take();
if (ch1 == -1)
var ch0 = scan.Take();
var chunkSize = 0;
var mode = 0;
while (ch0 != -1)
{
return false;
}
var ch1 = scan.Take();
if (ch1 == -1)
{
return false;
}
if (mode == 0)
{
if (ch0 >= '0' && ch0 <= '9')
if (mode == 0)
{
chunkSize = chunkSize * 0x10 + (ch0 - '0');
if (ch0 >= '0' && ch0 <= '9')
{
chunkSize = chunkSize * 0x10 + (ch0 - '0');
}
else if (ch0 >= 'A' && ch0 <= 'F')
{
chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10));
}
else if (ch0 >= 'a' && ch0 <= 'f')
{
chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10));
}
else
{
throw new NotImplementedException("INVALID REQUEST FORMAT");
}
mode = 1;
}
else if (ch0 >= 'A' && ch0 <= 'F')
else if (mode == 1)
{
chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10));
if (ch0 >= '0' && ch0 <= '9')
{
chunkSize = chunkSize * 0x10 + (ch0 - '0');
}
else if (ch0 >= 'A' && ch0 <= 'F')
{
chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10));
}
else if (ch0 >= 'a' && ch0 <= 'f')
{
chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10));
}
else if (ch0 == ';')
{
mode = 2;
}
else if (ch0 == '\r' && ch1 == '\n')
{
consumed = scan;
chunkSizeOut = chunkSize;
return true;
}
else
{
throw new NotImplementedException("INVALID REQUEST FORMAT");
}
}
else if (ch0 >= 'a' && ch0 <= 'f')
else if (mode == 2)
{
chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10));
if (ch0 == '\r' && ch1 == '\n')
{
consumed = scan;
chunkSizeOut = chunkSize;
return true;
}
else
{
// chunk-extensions not currently parsed
}
}
else
{
throw new NotImplementedException("INVALID REQUEST FORMAT");
}
mode = 1;
}
else if (mode == 1)
{
if (ch0 >= '0' && ch0 <= '9')
{
chunkSize = chunkSize * 0x10 + (ch0 - '0');
}
else if (ch0 >= 'A' && ch0 <= 'F')
{
chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10));
}
else if (ch0 >= 'a' && ch0 <= 'f')
{
chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10));
}
else if (ch0 == ';')
{
mode = 2;
}
else if (ch0 == '\r' && ch1 == '\n')
{
baton.JumpTo(scan);
chunkSizeOut = chunkSize;
return true;
}
else
{
throw new NotImplementedException("INVALID REQUEST FORMAT");
}
}
else if (mode == 2)
{
if (ch0 == '\r' && ch1 == '\n')
{
baton.JumpTo(scan);
chunkSizeOut = chunkSize;
return true;
}
else
{
// chunk-extensions not currently parsed
}
}
ch0 = ch1;
ch0 = ch1;
}
return false;
}
finally
{
baton.ConsumingComplete(consumed, scan);
}
return false;
}
private enum Mode

View File

@ -1,188 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.AspNet.Server.Kestrel.Http
{
/// <summary>
/// Summary description for MessageBodyExchanger
/// </summary>
public class MessageBodyExchanger
{
private static readonly WaitCallback _completePending = CompletePending;
protected readonly FrameContext _context;
private object _sync = new Object();
private ArraySegment<byte> _buffer;
private Queue<ReadOperation> _reads = new Queue<ReadOperation>();
private bool _send100Continue = true;
public MessageBodyExchanger(FrameContext context)
{
_context = context;
_buffer = new ArraySegment<byte>(_context.Memory.Empty);
}
public bool LocalIntakeFin { get; set; }
public void Transfer(int count, bool fin)
{
if (count == 0 && !fin)
{
return;
}
var input = _context.SocketInput;
lock (_sync)
{
if (_send100Continue)
{
_send100Continue = false;
}
// NOTE: this should not copy each time
var oldBuffer = _buffer;
var newData = _context.SocketInput.Take(count);
var newBuffer = new ArraySegment<byte>(
_context.Memory.AllocByte(oldBuffer.Count + newData.Count),
0,
oldBuffer.Count + newData.Count);
Array.Copy(oldBuffer.Array, oldBuffer.Offset, newBuffer.Array, newBuffer.Offset, oldBuffer.Count);
Array.Copy(newData.Array, newData.Offset, newBuffer.Array, newBuffer.Offset + oldBuffer.Count, newData.Count);
_buffer = newBuffer;
_context.Memory.FreeByte(oldBuffer.Array);
if (fin)
{
LocalIntakeFin = true;
}
if (_reads.Count != 0)
{
ThreadPool.QueueUserWorkItem(_completePending, this);
}
}
}
public Task<int> ReadAsync(ArraySegment<byte> buffer)
{
Task<int> result = null;
var send100Continue = false;
while (result == null)
{
while (CompletePending())
{
// earlier reads have priority
}
lock (_sync)
{
if (_buffer.Count != 0 || buffer.Count == 0 || LocalIntakeFin)
{
// there is data we can take right now
if (_reads.Count != 0)
{
// someone snuck in, try again
continue;
}
var count = Math.Min(buffer.Count, _buffer.Count);
Array.Copy(_buffer.Array, _buffer.Offset, buffer.Array, buffer.Offset, count);
_buffer = new ArraySegment<byte>(_buffer.Array, _buffer.Offset + count, _buffer.Count - count);
result = Task.FromResult(count);
}
else
{
// add ourselves to the line
var tcs = new TaskCompletionSource<int>();
_reads.Enqueue(new ReadOperation
{
Buffer = buffer,
CompletionSource = tcs,
});
result = tcs.Task;
send100Continue = _send100Continue;
_send100Continue = false;
}
}
}
if (send100Continue)
{
_context.FrameControl.ProduceContinue();
}
return result;
}
public Task<int> ReadAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
Task<int> result = null;
var send100Continue = false;
result = ReadAsyncImplementation(buffer, cancellationToken);
if (!result.IsCompleted)
{
lock (_sync)
{
send100Continue = _send100Continue;
_send100Continue = false;
}
}
if (send100Continue)
{
_context.FrameControl.ProduceContinue();
}
return result;
}
public virtual Task<int> ReadAsyncImplementation(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
throw new NotImplementedException("TODO");
}
static void CompletePending(object state)
{
while (((MessageBodyExchanger)state).CompletePending())
{
// loop until none left
}
}
bool CompletePending()
{
ReadOperation read;
int count;
lock (_sync)
{
if (_buffer.Count == 0 && !LocalIntakeFin)
{
return false;
}
if (_reads.Count == 0)
{
return false;
}
read = _reads.Dequeue();
count = Math.Min(read.Buffer.Count, _buffer.Count);
Array.Copy(_buffer.Array, _buffer.Offset, read.Buffer.Array, read.Buffer.Offset, count);
_buffer = new ArraySegment<byte>(_buffer.Array, _buffer.Offset + count, _buffer.Count - count);
}
if (read.CompletionSource != null)
{
read.CompletionSource.SetResult(count);
}
return true;
}
public struct ReadOperation
{
public TaskCompletionSource<int> CompletionSource;
public ArraySegment<byte> Buffer;
}
}
}

View File

@ -3,6 +3,7 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
@ -16,6 +17,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private static readonly Action _awaitableIsNotCompleted = () => { };
private readonly MemoryPool2 _memory;
private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false);
private Action _awaitableState;
private Exception _awaitableError;
@ -23,7 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private MemoryPoolBlock2 _head;
private MemoryPoolBlock2 _tail;
private MemoryPoolBlock2 _pinned;
private readonly object _syncHeadAndTail = new Object();
private readonly object _sync = new Object();
public SocketInput(MemoryPool2 memory)
{
@ -55,16 +57,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return taken;
}
public PinResult Pin(int minimumSize)
public IncomingBuffer IncomingStart(int minimumSize)
{
lock (_syncHeadAndTail)
lock (_sync)
{
if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
{
_pinned = _tail;
var data = new ArraySegment<byte>(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End);
var dataPtr = _pinned.Pin();
return new PinResult
return new IncomingBuffer
{
Data = data,
DataPtr = dataPtr,
@ -73,19 +75,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
_pinned = _memory.Lease(minimumSize);
return new PinResult
return new IncomingBuffer
{
Data = _pinned.Data,
DataPtr = _pinned.Pin()
};
}
public void Unpin(int count)
public void IncomingComplete(int count, Exception error)
{
// Unpin may called without an earlier Pin
if (_pinned != null)
Action awaitableState;
lock (_sync)
{
lock (_syncHeadAndTail)
// Unpin may called without an earlier Pin
if (_pinned != null)
{
_pinned.End += count;
if (_head == null)
@ -103,6 +107,71 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
_pinned = null;
if (count == 0)
{
RemoteIntakeFin = true;
}
if (error != null)
{
_awaitableError = error;
}
awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);
_manualResetEvent.Set();
}
if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
Task.Run(awaitableState);
}
}
public MemoryPoolBlock2.Iterator ConsumingStart()
{
lock (_sync)
{
return new MemoryPoolBlock2.Iterator(_head);
}
}
public void ConsumingComplete(
MemoryPoolBlock2.Iterator consumed,
MemoryPoolBlock2.Iterator examined)
{
MemoryPoolBlock2 returnStart = null;
MemoryPoolBlock2 returnEnd = null;
lock (_sync)
{
if (!consumed.IsDefault)
{
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}
}
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool.Return(returnBlock);
}
}
@ -111,7 +180,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return this;
}
public void OnCompleted(Action continuation)
{
var awaitableState = Interlocked.CompareExchange(
@ -138,94 +206,20 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
OnCompleted(continuation);
}
public void SetCompleted(Exception error)
{
if (error != null)
{
_awaitableError = error;
}
var awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);
if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
Task.Run(awaitableState);
}
}
public void SetNotCompleted()
{
if (RemoteIntakeFin || _awaitableError != null)
{
// TODO: Race condition - setting either of these can leave awaitable not completed
return;
}
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
if (awaitableState == _awaitableIsNotCompleted)
{
return;
}
else if (awaitableState == _awaitableIsCompleted)
{
return;
}
else
{
// THIS IS AN ERROR STATE - ONLY ONE WAITER MAY EXIST
}
}
public void GetResult()
{
if (!IsCompleted)
{
_manualResetEvent.Wait();
}
var error = _awaitableError;
if (error != null)
{
throw new AggregateException(error);
throw new IOException(error.Message, error);
}
}
public MemoryPoolBlock2.Iterator GetIterator()
{
lock (_syncHeadAndTail)
{
return new MemoryPoolBlock2.Iterator(_head);
}
}
public void JumpTo(MemoryPoolBlock2.Iterator iterator)
{
MemoryPoolBlock2 returnStart;
MemoryPoolBlock2 returnEnd;
lock (_syncHeadAndTail)
{
// TODO: leave _pinned intact
// TODO: return when empty
returnStart = _head;
returnEnd = iterator.Block;
_head = iterator.Block;
_head.Start = iterator.Index;
if (iterator.IsEnd)
{
SetNotCompleted();
}
}
while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool.Return(returnBlock);
}
}
public struct PinResult
public struct IncomingBuffer
{
public ArraySegment<byte> Data;
public IntPtr DataPtr;

View File

@ -1,192 +0,0 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Microsoft.AspNet.Server.Kestrel.Http;
using System;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestrelTests
{
/// <summary>
/// Summary description for MessageBodyExchangerTests
/// </summary>
public class MessageBodyExchangerTests
{
[Fact]
public async Task CallingReadAsyncBeforeTransfer()
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
var task1 = exchanger.ReadAsync(new ArraySegment<byte>(buffer1));
var task2 = exchanger.ReadAsync(new ArraySegment<byte>(buffer2));
Assert.False(task1.IsCompleted);
Assert.False(task2.IsCompleted);
testInput.Add("Hello");
exchanger.Transfer(3, false);
var count1 = await task1;
Assert.True(task1.IsCompleted);
Assert.False(task2.IsCompleted);
AssertASCII("Hel", new ArraySegment<byte>(buffer1, 0, count1));
exchanger.Transfer(2, false);
var count2 = await task2;
Assert.True(task1.IsCompleted);
Assert.True(task2.IsCompleted);
AssertASCII("lo", new ArraySegment<byte>(buffer2, 0, count2));
}
[Fact]
public async Task CallingTransferBeforeReadAsync()
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
testInput.Add("Hello");
exchanger.Transfer(5, false);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
var task1 = exchanger.ReadAsync(new ArraySegment<byte>(buffer1));
var task2 = exchanger.ReadAsync(new ArraySegment<byte>(buffer2));
Assert.True(task1.IsCompleted);
Assert.False(task2.IsCompleted);
await task1;
var count1 = await task1;
Assert.True(task1.IsCompleted);
Assert.False(task2.IsCompleted);
AssertASCII("Hello", new ArraySegment<byte>(buffer1, 0, count1));
}
[Fact]
public async Task TransferZeroBytesDoesNotReleaseReadAsync()
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
var task1 = exchanger.ReadAsync(new ArraySegment<byte>(buffer1));
var task2 = exchanger.ReadAsync(new ArraySegment<byte>(buffer2));
Assert.False(task1.IsCompleted);
Assert.False(task2.IsCompleted);
testInput.Add("Hello");
exchanger.Transfer(3, false);
var count1 = await task1;
Assert.True(task1.IsCompleted);
Assert.False(task2.IsCompleted);
AssertASCII("Hel", new ArraySegment<byte>(buffer1, 0, count1));
exchanger.Transfer(0, false);
Assert.True(task1.IsCompleted);
Assert.False(task2.IsCompleted);
}
[Fact]
public async Task TransferFinDoesReleaseReadAsync()
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
var task1 = exchanger.ReadAsync(new ArraySegment<byte>(buffer1));
var task2 = exchanger.ReadAsync(new ArraySegment<byte>(buffer2));
Assert.False(task1.IsCompleted);
Assert.False(task2.IsCompleted);
testInput.Add("Hello");
exchanger.Transfer(3, false);
var count1 = await task1;
Assert.True(task1.IsCompleted);
Assert.False(task2.IsCompleted);
AssertASCII("Hel", new ArraySegment<byte>(buffer1, 0, count1));
exchanger.Transfer(0, true);
var count2 = await task2;
Assert.True(task1.IsCompleted);
Assert.True(task2.IsCompleted);
Assert.Equal(0, count2);
}
[Fact]
public async Task TransferFinFirstDoesReturnsCompletedReadAsyncs()
{
var testInput = new TestInput();
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool2());
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
testInput.Add("Hello");
exchanger.Transfer(5, true);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
var task1 = exchanger.ReadAsync(new ArraySegment<byte>(buffer1));
var task2 = exchanger.ReadAsync(new ArraySegment<byte>(buffer2));
Assert.True(task1.IsCompleted);
Assert.True(task2.IsCompleted);
var count1 = await task1;
var count2 = await task2;
AssertASCII("Hello", new ArraySegment<byte>(buffer1, 0, count1));
Assert.Equal(0, count2);
}
private void AssertASCII(string expected, ArraySegment<byte> actual)
{
var encoding = System.Text.Encoding.ASCII;
var bytes = encoding.GetBytes(expected);
Assert.Equal(bytes.Length, actual.Count);
for (var index = 0; index != bytes.Length; ++index)
{
Assert.Equal(bytes[index], actual.Array[actual.Offset + index]);
}
}
}
}

View File

@ -71,7 +71,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var task = _reader.ReadAsync(actual, offset, actual.Length - offset);
if (!Debugger.IsAttached)
{
Assert.True(task.Wait(1000), "timeout");
Assert.True(task.Wait(4000), "timeout");
}
var count = await task;
if (count == 0)
@ -95,7 +95,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
var task = _reader.ReadAsync(actual, offset, 1);
if (!Debugger.IsAttached)
{
Assert.True(task.Wait(1000), "timeout");
Assert.True(task.Wait(4000), "timeout");
}
var count = await task;
if (count == 0)

View File

@ -29,10 +29,9 @@ namespace Microsoft.AspNet.Server.KestrelTests
{
var encoding = System.Text.Encoding.ASCII;
var count = encoding.GetByteCount(text);
var buffer = FrameContext.SocketInput.Pin(text.Length);
var buffer = FrameContext.SocketInput.IncomingStart(text.Length);
count = encoding.GetBytes(text, 0, text.Length, buffer.Data.Array, buffer.Data.Offset);
FrameContext.SocketInput.Unpin(count);
FrameContext.SocketInput.SetCompleted(null);
FrameContext.SocketInput.IncomingComplete(count, null);
if (fin)
{
FrameContext.SocketInput.RemoteIntakeFin = true;

View File

@ -13,17 +13,17 @@ namespace Microsoft.AspNet.Server.KestrelTests
public override void ConnectionRead(long connectionId, int count)
{
_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" recv {count} bytes.", connectionId, count);
//_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" recv {count} bytes.", connectionId, count);
}
public override void ConnectionWrite(long connectionId, int count)
{
_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send {count} bytes.", connectionId, count);
//_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send {count} bytes.", connectionId, count);
}
public override void ConnectionWriteCallback(long connectionId, int status)
{
_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send finished with status {status}.", connectionId, status);
//_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send finished with status {status}.", connectionId, status);
}
public class TestLogger : ILogger