diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index 05c1eb1878..72a753a6f9 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -54,7 +54,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize) { - var result = SocketInput.Pin(2048); + var result = SocketInput.IncomingStart(2048); return handle.Libuv.buf_init( result.DataPtr, @@ -68,8 +68,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private void OnRead(UvStreamHandle handle, int readCount, int errorCode, Exception error) { - SocketInput.Unpin(readCount); - var normalRead = readCount != 0 && errorCode == 0; var normalDone = readCount == 0 && (errorCode == 0 || errorCode == Constants.ECONNRESET || errorCode == Constants.EOF); var errorDone = !(normalDone || normalRead); @@ -80,12 +78,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } else if (normalDone || errorDone) { - SocketInput.RemoteIntakeFin = true; _socket.ReadStop(); Log.ConnectionReadFin(_connectionId); } - SocketInput.SetCompleted(errorDone ? error : null); + SocketInput.IncomingComplete(readCount, errorDone ? error : null); } void IConnectionControl.Pause() diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index 080aae8d2b..944d9807ec 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -505,52 +505,62 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private bool TakeStartLine(SocketInput input) { - var scan = input.GetIterator(); - - var begin = scan; - if (scan.Seek(' ') == -1) + var scan = input.ConsumingStart(); + var consumed = scan; + try { - return false; - } - var method = begin.GetString(scan); - - scan.Take(); - begin = scan; - var chFound = scan.Seek(' ', '?'); - if (chFound == -1) - { - return false; - } - var requestUri = begin.GetString(scan); - - var queryString = ""; - if (chFound == '?') - { - begin = scan; - if (scan.Seek(' ') != ' ') + var begin = scan; + if (scan.Seek(' ') == -1) { return false; } - queryString = begin.GetString(scan); - } + var method = begin.GetString(scan); - scan.Take(); - begin = scan; - if (scan.Seek('\r') == -1) + scan.Take(); + begin = scan; + var chFound = scan.Seek(' ', '?'); + if (chFound == -1) + { + return false; + } + var requestUri = begin.GetString(scan); + + var queryString = ""; + if (chFound == '?') + { + begin = scan; + if (scan.Seek(' ') != ' ') + { + return false; + } + queryString = begin.GetString(scan); + } + + scan.Take(); + begin = scan; + if (scan.Seek('\r') == -1) + { + return false; + } + var httpVersion = begin.GetString(scan); + + scan.Take(); + if (scan.Take() != '\n') + { + return false; + } + + consumed = scan; + Method = method; + RequestUri = requestUri; + QueryString = queryString; + HttpVersion = httpVersion; + return true; + } + finally { - return false; + input.ConsumingComplete(consumed, scan); } - var httpVersion = begin.GetString(scan); - - scan.Take(); - if (scan.Take() != '\n') return false; - - Method = method; - RequestUri = requestUri; - QueryString = queryString; - HttpVersion = httpVersion; - input.JumpTo(scan); - return true; } static string GetString(ArraySegment range, int startIndex, int endIndex) @@ -560,12 +570,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private bool TakeMessageHeaders(SocketInput input) { - int chFirst; - int chSecond; - var scan = input.GetIterator(); + var scan = input.ConsumingStart(); var consumed = scan; try { + int chFirst; + int chSecond; while (!scan.IsEnd) { var beginName = scan; @@ -642,8 +652,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http value = value.Replace("\r\n", " "); } - _requestHeaders.Append(name.Array, name.Offset, name.Count, value); consumed = scan; + _requestHeaders.Append(name.Array, name.Offset, name.Count, value); break; } } @@ -651,7 +661,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } finally { - input.JumpTo(consumed); + input.ConsumingComplete(consumed, scan); } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 257df299aa..caf528fb5e 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -9,14 +9,36 @@ using Microsoft.Framework.Primitives; namespace Microsoft.AspNet.Server.Kestrel.Http { - public abstract class MessageBody : MessageBodyExchanger + public abstract class MessageBody { - protected MessageBody(FrameContext context) : base(context) + private FrameContext _context; + private int _send100Continue = 1; + + protected MessageBody(FrameContext context) { + _context = context; } public bool RequestKeepAlive { get; protected set; } + public Task ReadAsync(ArraySegment buffer, CancellationToken cancellationToken = default(CancellationToken)) + { + Task result = null; + var send100Continue = 0; + result = ReadAsyncImplementation(buffer, cancellationToken); + if (!result.IsCompleted) + { + send100Continue = Interlocked.Exchange(ref _send100Continue, 0); + } + if (send100Continue == 1) + { + _context.FrameControl.ProduceContinue(); + } + return result; + } + + public abstract Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); + public static MessageBody For( string httpVersion, IDictionary headers, @@ -90,10 +112,10 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { await input; - var begin = input.GetIterator(); + var begin = input.ConsumingStart(); int actual; var end = begin.CopyTo(buffer.Array, buffer.Offset, buffer.Count, out actual); - input.JumpTo(end); + input.ConsumingComplete(end, end); if (actual != 0) { @@ -134,11 +156,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http await input; - var begin = input.GetIterator(); + var begin = input.ConsumingStart(); int actual; var end = begin.CopyTo(buffer.Array, buffer.Offset, limit, out actual); _inputLength -= actual; - input.JumpTo(end); + input.ConsumingComplete(end, end); if (actual != 0) { return actual; @@ -198,11 +220,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http await input; } - var begin = input.GetIterator(); + var begin = input.ConsumingStart(); int actual; var end = begin.CopyTo(buffer.Array, buffer.Offset, limit, out actual); _inputLength -= actual; - input.JumpTo(end); + input.ConsumingComplete(end, end); if (_inputLength == 0) { @@ -215,16 +237,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } while (_mode == Mode.ChunkSuffix) { - var scan = input.GetIterator(); + var scan = input.ConsumingStart(); + var consumed = scan; var ch1 = scan.Take(); var ch2 = scan.Take(); if (ch1 == -1 || ch2 == -1) { + input.ConsumingComplete(consumed, scan); await input; } else if (ch1 == '\r' && ch2 == '\n') { - input.JumpTo(scan); + input.ConsumingComplete(scan, scan); _mode = Mode.ChunkPrefix; } else @@ -239,84 +263,92 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private static bool TakeChunkedLine(SocketInput baton, ref int chunkSizeOut) { - var scan = baton.GetIterator(); - var ch0 = scan.Take(); - var chunkSize = 0; - var mode = 0; - while (ch0 != -1) + var scan = baton.ConsumingStart(); + var consumed = scan; + try { - var ch1 = scan.Take(); - if (ch1 == -1) + var ch0 = scan.Take(); + var chunkSize = 0; + var mode = 0; + while (ch0 != -1) { - return false; - } + var ch1 = scan.Take(); + if (ch1 == -1) + { + return false; + } - if (mode == 0) - { - if (ch0 >= '0' && ch0 <= '9') + if (mode == 0) { - chunkSize = chunkSize * 0x10 + (ch0 - '0'); + if (ch0 >= '0' && ch0 <= '9') + { + chunkSize = chunkSize * 0x10 + (ch0 - '0'); + } + else if (ch0 >= 'A' && ch0 <= 'F') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10)); + } + else if (ch0 >= 'a' && ch0 <= 'f') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10)); + } + else + { + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } + mode = 1; } - else if (ch0 >= 'A' && ch0 <= 'F') + else if (mode == 1) { - chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10)); + if (ch0 >= '0' && ch0 <= '9') + { + chunkSize = chunkSize * 0x10 + (ch0 - '0'); + } + else if (ch0 >= 'A' && ch0 <= 'F') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10)); + } + else if (ch0 >= 'a' && ch0 <= 'f') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10)); + } + else if (ch0 == ';') + { + mode = 2; + } + else if (ch0 == '\r' && ch1 == '\n') + { + consumed = scan; + chunkSizeOut = chunkSize; + return true; + } + else + { + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } } - else if (ch0 >= 'a' && ch0 <= 'f') + else if (mode == 2) { - chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10)); + if (ch0 == '\r' && ch1 == '\n') + { + consumed = scan; + chunkSizeOut = chunkSize; + return true; + } + else + { + // chunk-extensions not currently parsed + } } - else - { - throw new NotImplementedException("INVALID REQUEST FORMAT"); - } - mode = 1; - } - else if (mode == 1) - { - if (ch0 >= '0' && ch0 <= '9') - { - chunkSize = chunkSize * 0x10 + (ch0 - '0'); - } - else if (ch0 >= 'A' && ch0 <= 'F') - { - chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10)); - } - else if (ch0 >= 'a' && ch0 <= 'f') - { - chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10)); - } - else if (ch0 == ';') - { - mode = 2; - } - else if (ch0 == '\r' && ch1 == '\n') - { - baton.JumpTo(scan); - chunkSizeOut = chunkSize; - return true; - } - else - { - throw new NotImplementedException("INVALID REQUEST FORMAT"); - } - } - else if (mode == 2) - { - if (ch0 == '\r' && ch1 == '\n') - { - baton.JumpTo(scan); - chunkSizeOut = chunkSize; - return true; - } - else - { - // chunk-extensions not currently parsed - } - } - ch0 = ch1; + ch0 = ch1; + } + return false; + } + finally + { + baton.ConsumingComplete(consumed, scan); } - return false; } private enum Mode diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBodyExchanger.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBodyExchanger.cs deleted file mode 100644 index f0e72084fb..0000000000 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBodyExchanger.cs +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace Microsoft.AspNet.Server.Kestrel.Http -{ - /// - /// Summary description for MessageBodyExchanger - /// - public class MessageBodyExchanger - { - private static readonly WaitCallback _completePending = CompletePending; - protected readonly FrameContext _context; - - private object _sync = new Object(); - - private ArraySegment _buffer; - private Queue _reads = new Queue(); - private bool _send100Continue = true; - - public MessageBodyExchanger(FrameContext context) - { - _context = context; - _buffer = new ArraySegment(_context.Memory.Empty); - } - - public bool LocalIntakeFin { get; set; } - - public void Transfer(int count, bool fin) - { - if (count == 0 && !fin) - { - return; - } - var input = _context.SocketInput; - lock (_sync) - { - if (_send100Continue) - { - _send100Continue = false; - } - - // NOTE: this should not copy each time - var oldBuffer = _buffer; - var newData = _context.SocketInput.Take(count); - - var newBuffer = new ArraySegment( - _context.Memory.AllocByte(oldBuffer.Count + newData.Count), - 0, - oldBuffer.Count + newData.Count); - - Array.Copy(oldBuffer.Array, oldBuffer.Offset, newBuffer.Array, newBuffer.Offset, oldBuffer.Count); - Array.Copy(newData.Array, newData.Offset, newBuffer.Array, newBuffer.Offset + oldBuffer.Count, newData.Count); - - _buffer = newBuffer; - _context.Memory.FreeByte(oldBuffer.Array); - - if (fin) - { - LocalIntakeFin = true; - } - if (_reads.Count != 0) - { - ThreadPool.QueueUserWorkItem(_completePending, this); - } - } - } - - public Task ReadAsync(ArraySegment buffer) - { - Task result = null; - var send100Continue = false; - while (result == null) - { - while (CompletePending()) - { - // earlier reads have priority - } - lock (_sync) - { - if (_buffer.Count != 0 || buffer.Count == 0 || LocalIntakeFin) - { - // there is data we can take right now - if (_reads.Count != 0) - { - // someone snuck in, try again - continue; - } - - var count = Math.Min(buffer.Count, _buffer.Count); - Array.Copy(_buffer.Array, _buffer.Offset, buffer.Array, buffer.Offset, count); - _buffer = new ArraySegment(_buffer.Array, _buffer.Offset + count, _buffer.Count - count); - result = Task.FromResult(count); - } - else - { - // add ourselves to the line - var tcs = new TaskCompletionSource(); - _reads.Enqueue(new ReadOperation - { - Buffer = buffer, - CompletionSource = tcs, - }); - result = tcs.Task; - send100Continue = _send100Continue; - _send100Continue = false; - } - } - } - if (send100Continue) - { - _context.FrameControl.ProduceContinue(); - } - return result; - } - - public Task ReadAsync(ArraySegment buffer, CancellationToken cancellationToken) - { - Task result = null; - var send100Continue = false; - result = ReadAsyncImplementation(buffer, cancellationToken); - if (!result.IsCompleted) - { - lock (_sync) - { - send100Continue = _send100Continue; - _send100Continue = false; - } - } - if (send100Continue) - { - _context.FrameControl.ProduceContinue(); - } - return result; - } - - public virtual Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) - { - throw new NotImplementedException("TODO"); - } - - static void CompletePending(object state) - { - while (((MessageBodyExchanger)state).CompletePending()) - { - // loop until none left - } - } - - bool CompletePending() - { - ReadOperation read; - int count; - lock (_sync) - { - if (_buffer.Count == 0 && !LocalIntakeFin) - { - return false; - } - if (_reads.Count == 0) - { - return false; - } - read = _reads.Dequeue(); - - count = Math.Min(read.Buffer.Count, _buffer.Count); - Array.Copy(_buffer.Array, _buffer.Offset, read.Buffer.Array, read.Buffer.Offset, count); - _buffer = new ArraySegment(_buffer.Array, _buffer.Offset + count, _buffer.Count - count); - } - if (read.CompletionSource != null) - { - read.CompletionSource.SetResult(count); - } - return true; - } - - public struct ReadOperation - { - public TaskCompletionSource CompletionSource; - public ArraySegment Buffer; - } - } -} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs index 1ff2530bc4..03ef803e9c 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -3,6 +3,7 @@ using System; using System.Diagnostics; +using System.IO; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; @@ -16,6 +17,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private static readonly Action _awaitableIsNotCompleted = () => { }; private readonly MemoryPool2 _memory; + private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false); private Action _awaitableState; private Exception _awaitableError; @@ -23,7 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http private MemoryPoolBlock2 _head; private MemoryPoolBlock2 _tail; private MemoryPoolBlock2 _pinned; - private readonly object _syncHeadAndTail = new Object(); + private readonly object _sync = new Object(); public SocketInput(MemoryPool2 memory) { @@ -55,16 +57,16 @@ namespace Microsoft.AspNet.Server.Kestrel.Http return taken; } - public PinResult Pin(int minimumSize) + public IncomingBuffer IncomingStart(int minimumSize) { - lock (_syncHeadAndTail) + lock (_sync) { if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End) { _pinned = _tail; var data = new ArraySegment(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End); var dataPtr = _pinned.Pin(); - return new PinResult + return new IncomingBuffer { Data = data, DataPtr = dataPtr, @@ -73,19 +75,21 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } _pinned = _memory.Lease(minimumSize); - return new PinResult + return new IncomingBuffer { Data = _pinned.Data, DataPtr = _pinned.Pin() }; } - public void Unpin(int count) + public void IncomingComplete(int count, Exception error) { - // Unpin may called without an earlier Pin - if (_pinned != null) + Action awaitableState; + + lock (_sync) { - lock (_syncHeadAndTail) + // Unpin may called without an earlier Pin + if (_pinned != null) { _pinned.End += count; if (_head == null) @@ -103,6 +107,71 @@ namespace Microsoft.AspNet.Server.Kestrel.Http } } _pinned = null; + + if (count == 0) + { + RemoteIntakeFin = true; + } + if (error != null) + { + _awaitableError = error; + } + + awaitableState = Interlocked.Exchange( + ref _awaitableState, + _awaitableIsCompleted); + + _manualResetEvent.Set(); + } + + if (awaitableState != _awaitableIsCompleted && + awaitableState != _awaitableIsNotCompleted) + { + Task.Run(awaitableState); + } + } + + public MemoryPoolBlock2.Iterator ConsumingStart() + { + lock (_sync) + { + return new MemoryPoolBlock2.Iterator(_head); + } + } + + public void ConsumingComplete( + MemoryPoolBlock2.Iterator consumed, + MemoryPoolBlock2.Iterator examined) + { + MemoryPoolBlock2 returnStart = null; + MemoryPoolBlock2 returnEnd = null; + lock (_sync) + { + if (!consumed.IsDefault) + { + returnStart = _head; + returnEnd = consumed.Block; + _head = consumed.Block; + _head.Start = consumed.Index; + } + if (!examined.IsDefault && + examined.IsEnd && + RemoteIntakeFin == false && + _awaitableError == null) + { + _manualResetEvent.Reset(); + + var awaitableState = Interlocked.CompareExchange( + ref _awaitableState, + _awaitableIsNotCompleted, + _awaitableIsCompleted); + } + } + while (returnStart != returnEnd) + { + var returnBlock = returnStart; + returnStart = returnStart.Next; + returnBlock.Pool.Return(returnBlock); } } @@ -111,7 +180,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Http return this; } - public void OnCompleted(Action continuation) { var awaitableState = Interlocked.CompareExchange( @@ -138,94 +206,20 @@ namespace Microsoft.AspNet.Server.Kestrel.Http OnCompleted(continuation); } - public void SetCompleted(Exception error) - { - if (error != null) - { - _awaitableError = error; - } - - var awaitableState = Interlocked.Exchange( - ref _awaitableState, - _awaitableIsCompleted); - - if (awaitableState != _awaitableIsCompleted && - awaitableState != _awaitableIsNotCompleted) - { - Task.Run(awaitableState); - } - } - - public void SetNotCompleted() - { - if (RemoteIntakeFin || _awaitableError != null) - { - // TODO: Race condition - setting either of these can leave awaitable not completed - return; - } - var awaitableState = Interlocked.CompareExchange( - ref _awaitableState, - _awaitableIsNotCompleted, - _awaitableIsCompleted); - - if (awaitableState == _awaitableIsNotCompleted) - { - return; - } - else if (awaitableState == _awaitableIsCompleted) - { - return; - } - else - { - // THIS IS AN ERROR STATE - ONLY ONE WAITER MAY EXIST - } - } - public void GetResult() { + if (!IsCompleted) + { + _manualResetEvent.Wait(); + } var error = _awaitableError; if (error != null) { - throw new AggregateException(error); + throw new IOException(error.Message, error); } } - public MemoryPoolBlock2.Iterator GetIterator() - { - lock (_syncHeadAndTail) - { - return new MemoryPoolBlock2.Iterator(_head); - } - } - - public void JumpTo(MemoryPoolBlock2.Iterator iterator) - { - MemoryPoolBlock2 returnStart; - MemoryPoolBlock2 returnEnd; - lock (_syncHeadAndTail) - { - // TODO: leave _pinned intact - // TODO: return when empty - - returnStart = _head; - returnEnd = iterator.Block; - _head = iterator.Block; - _head.Start = iterator.Index; - if (iterator.IsEnd) - { - SetNotCompleted(); - } - } - while (returnStart != returnEnd) - { - var returnBlock = returnStart; - returnStart = returnStart.Next; - returnBlock.Pool.Return(returnBlock); - } - } - - public struct PinResult + public struct IncomingBuffer { public ArraySegment Data; public IntPtr DataPtr; diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MessageBodyExchangerTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MessageBodyExchangerTests.cs deleted file mode 100644 index 9983e3a3c7..0000000000 --- a/test/Microsoft.AspNet.Server.KestrelTests/MessageBodyExchangerTests.cs +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using Microsoft.AspNet.Server.Kestrel.Http; -using System; -using System.Threading.Tasks; -using Xunit; - -namespace Microsoft.AspNet.Server.KestrelTests -{ - /// - /// Summary description for MessageBodyExchangerTests - /// - public class MessageBodyExchangerTests - { - [Fact] - public async Task CallingReadAsyncBeforeTransfer() - { - var testInput = new TestInput(); - var context = new ConnectionContext(); - context.SocketInput = new SocketInput(new MemoryPool2()); - - var exchanger = new MessageBodyExchanger(testInput.FrameContext); - - var buffer1 = new byte[1024]; - var buffer2 = new byte[1024]; - var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); - var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); - - Assert.False(task1.IsCompleted); - Assert.False(task2.IsCompleted); - - testInput.Add("Hello"); - - exchanger.Transfer(3, false); - - var count1 = await task1; - - Assert.True(task1.IsCompleted); - Assert.False(task2.IsCompleted); - AssertASCII("Hel", new ArraySegment(buffer1, 0, count1)); - - exchanger.Transfer(2, false); - - var count2 = await task2; - - Assert.True(task1.IsCompleted); - Assert.True(task2.IsCompleted); - AssertASCII("lo", new ArraySegment(buffer2, 0, count2)); - } - - [Fact] - public async Task CallingTransferBeforeReadAsync() - { - var testInput = new TestInput(); - var context = new ConnectionContext(); - context.SocketInput = new SocketInput(new MemoryPool2()); - - var exchanger = new MessageBodyExchanger(testInput.FrameContext); - - testInput.Add("Hello"); - - exchanger.Transfer(5, false); - - var buffer1 = new byte[1024]; - var buffer2 = new byte[1024]; - var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); - var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); - - Assert.True(task1.IsCompleted); - Assert.False(task2.IsCompleted); - - await task1; - - var count1 = await task1; - - Assert.True(task1.IsCompleted); - Assert.False(task2.IsCompleted); - AssertASCII("Hello", new ArraySegment(buffer1, 0, count1)); - } - - [Fact] - public async Task TransferZeroBytesDoesNotReleaseReadAsync() - { - var testInput = new TestInput(); - var context = new ConnectionContext(); - context.SocketInput = new SocketInput(new MemoryPool2()); - - var exchanger = new MessageBodyExchanger(testInput.FrameContext); - - var buffer1 = new byte[1024]; - var buffer2 = new byte[1024]; - var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); - var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); - - Assert.False(task1.IsCompleted); - Assert.False(task2.IsCompleted); - - testInput.Add("Hello"); - - exchanger.Transfer(3, false); - - var count1 = await task1; - - Assert.True(task1.IsCompleted); - Assert.False(task2.IsCompleted); - AssertASCII("Hel", new ArraySegment(buffer1, 0, count1)); - - exchanger.Transfer(0, false); - - Assert.True(task1.IsCompleted); - Assert.False(task2.IsCompleted); - } - - [Fact] - public async Task TransferFinDoesReleaseReadAsync() - { - var testInput = new TestInput(); - var context = new ConnectionContext(); - context.SocketInput = new SocketInput(new MemoryPool2()); - - var exchanger = new MessageBodyExchanger(testInput.FrameContext); - - var buffer1 = new byte[1024]; - var buffer2 = new byte[1024]; - var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); - var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); - - Assert.False(task1.IsCompleted); - Assert.False(task2.IsCompleted); - - testInput.Add("Hello"); - - exchanger.Transfer(3, false); - - var count1 = await task1; - - Assert.True(task1.IsCompleted); - Assert.False(task2.IsCompleted); - AssertASCII("Hel", new ArraySegment(buffer1, 0, count1)); - - exchanger.Transfer(0, true); - - var count2 = await task2; - - Assert.True(task1.IsCompleted); - Assert.True(task2.IsCompleted); - Assert.Equal(0, count2); - } - - - [Fact] - public async Task TransferFinFirstDoesReturnsCompletedReadAsyncs() - { - - var testInput = new TestInput(); - var context = new ConnectionContext(); - context.SocketInput = new SocketInput(new MemoryPool2()); - - var exchanger = new MessageBodyExchanger(testInput.FrameContext); - - testInput.Add("Hello"); - - exchanger.Transfer(5, true); - - var buffer1 = new byte[1024]; - var buffer2 = new byte[1024]; - var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); - var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); - - Assert.True(task1.IsCompleted); - Assert.True(task2.IsCompleted); - - var count1 = await task1; - var count2 = await task2; - - AssertASCII("Hello", new ArraySegment(buffer1, 0, count1)); - Assert.Equal(0, count2); - } - - private void AssertASCII(string expected, ArraySegment actual) - { - var encoding = System.Text.Encoding.ASCII; - var bytes = encoding.GetBytes(expected); - Assert.Equal(bytes.Length, actual.Count); - for (var index = 0; index != bytes.Length; ++index) - { - Assert.Equal(bytes[index], actual.Array[actual.Offset + index]); - } - } - } -} \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestConnection.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestConnection.cs index 773bddf70d..c881979f52 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestConnection.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestConnection.cs @@ -71,7 +71,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var task = _reader.ReadAsync(actual, offset, actual.Length - offset); if (!Debugger.IsAttached) { - Assert.True(task.Wait(1000), "timeout"); + Assert.True(task.Wait(4000), "timeout"); } var count = await task; if (count == 0) @@ -95,7 +95,7 @@ namespace Microsoft.AspNet.Server.KestrelTests var task = _reader.ReadAsync(actual, offset, 1); if (!Debugger.IsAttached) { - Assert.True(task.Wait(1000), "timeout"); + Assert.True(task.Wait(4000), "timeout"); } var count = await task; if (count == 0) diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs index 506e1907ac..63f55cab03 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs @@ -29,10 +29,9 @@ namespace Microsoft.AspNet.Server.KestrelTests { var encoding = System.Text.Encoding.ASCII; var count = encoding.GetByteCount(text); - var buffer = FrameContext.SocketInput.Pin(text.Length); + var buffer = FrameContext.SocketInput.IncomingStart(text.Length); count = encoding.GetBytes(text, 0, text.Length, buffer.Data.Array, buffer.Data.Offset); - FrameContext.SocketInput.Unpin(count); - FrameContext.SocketInput.SetCompleted(null); + FrameContext.SocketInput.IncomingComplete(count, null); if (fin) { FrameContext.SocketInput.RemoteIntakeFin = true; diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestLogger.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestLogger.cs index e643c1d0f9..e8658f7615 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestLogger.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestLogger.cs @@ -13,17 +13,17 @@ namespace Microsoft.AspNet.Server.KestrelTests public override void ConnectionRead(long connectionId, int count) { - _logger.LogDebug(1, @"Connection id ""{ConnectionId}"" recv {count} bytes.", connectionId, count); + //_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" recv {count} bytes.", connectionId, count); } public override void ConnectionWrite(long connectionId, int count) { - _logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send {count} bytes.", connectionId, count); + //_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send {count} bytes.", connectionId, count); } public override void ConnectionWriteCallback(long connectionId, int status) { - _logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send finished with status {status}.", connectionId, status); + //_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" send finished with status {status}.", connectionId, status); } public class TestLogger : ILogger