Trying to get 100 Continue working again

This commit is contained in:
Louis DeJardin 2014-06-08 01:03:56 -07:00
parent 044bbb83e6
commit ebd6af0fd6
14 changed files with 390 additions and 184 deletions

View File

@ -17,6 +17,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
}
public ConnectionContext(ConnectionContext context) : base(context)
{
SocketInput = context.SocketInput;
SocketOutput = context.SocketOutput;
ConnectionControl = context.ConnectionControl;
}
public SocketInput SocketInput { get; set; }
public ISocketOutput SocketOutput { get; set; }

View File

@ -20,10 +20,29 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
ConnectionKeepAlive,
}
public class Frame
public class FrameContext : ConnectionContext
{
private ConnectionContext _context;
public FrameContext()
{
}
public FrameContext(ConnectionContext context) : base(context)
{
}
public IFrameControl FrameControl { get; set; }
}
public interface IFrameControl
{
void ProduceContinue();
void Write(ArraySegment<byte> data, Action<object> callback, object state);
}
public class Frame : FrameContext, IFrameControl
{
Mode _mode;
enum Mode
@ -64,10 +83,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Task _upgradeTask = _completedTask;
static readonly Task _completedTask = Task.FromResult(0);
public Frame(ConnectionContext context)
public Frame(ConnectionContext context) : base(context)
{
_context = context;
FrameControl = this;
}
/*
public bool LocalIntakeFin
{
@ -81,7 +101,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
*/
public void Consume()
{
var input = _context.SocketInput;
var input = SocketInput;
for (; ;)
{
switch (_mode)
@ -146,46 +166,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
}
}
Action<Frame, Exception> HandleExpectContinue(Action<Frame, Exception> continuation)
{
string[] expect;
if (_httpVersion.Equals("HTTP/1.1") &&
_requestHeaders.TryGetValue("Expect", out expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
return (frame, error) =>
{
if (_resultStarted)
{
continuation.Invoke(frame, error);
}
else
{
var bytes = Encoding.Default.GetBytes("HTTP/1.1 100 Continue\r\n\r\n");
//var isasync = _context.SocketOutput.Write(
// new ArraySegment<byte>(bytes),
// error2 => continuation(frame, error2));
//if (!isasync)
//{
// continuation.Invoke(frame, null);
//}
}
};
}
return continuation;
}
private void Execute()
{
_messageBody = MessageBody.For(
_httpVersion,
_requestHeaders,
_context);
this);
_keepAlive = _messageBody.RequestKeepAlive;
_callContext = CreateCallContext();
_context.SocketInput.Free();
SocketInput.Free();
Task.Run(ExecuteAsync);
}
@ -194,7 +183,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Exception error = null;
try
{
await _context.Application.Invoke(_callContext);
await Application.Invoke(_callContext);
await _upgradeTask;
}
catch (Exception ex)
@ -210,7 +199,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private CallContext CreateCallContext()
{
_inputStream = new FrameRequestStream(_messageBody);
_outputStream = new FrameResponseStream(OnWrite);
_outputStream = new FrameResponseStream(this);
_duplexStream = new FrameDuplexStream(_inputStream, _outputStream);
var remoteIpAddress = "127.0.0.1";
@ -279,13 +268,13 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
return callContext;
}
void OnWrite(ArraySegment<byte> data, Action<object> callback, object state)
public void Write(ArraySegment<byte> data, Action<object> callback, object state)
{
ProduceStart();
_context.SocketOutput.Write(data, callback, state);
SocketOutput.Write(data, callback, state);
}
void Upgrade(IDictionary<string, object> options, Func<object, Task> callback)
public void Upgrade(IDictionary<string, object> options, Func<object, Task> callback)
{
_keepAlive = false;
ProduceStart();
@ -293,7 +282,22 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
_upgradeTask = callback(_callContext);
}
void ProduceStart()
byte[] _continueBytes = Encoding.ASCII.GetBytes("HTTP/1.1 100 Continue\r\n\r\n");
public void ProduceContinue()
{
if (_resultStarted) return;
string[] expect;
if (_httpVersion.Equals("HTTP/1.1") &&
_requestHeaders.TryGetValue("Expect", out expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
SocketOutput.Write(new ArraySegment<byte>(_continueBytes, 0, _continueBytes.Length), _ => { }, null);
}
}
public void ProduceStart()
{
if (_resultStarted) return;
@ -305,27 +309,27 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
response.ReasonPhrase);
var responseHeader = CreateResponseHeader(status, _responseHeaders);
_context.SocketOutput.Write(responseHeader.Item1, x => ((IDisposable)x).Dispose(), responseHeader.Item2);
SocketOutput.Write(responseHeader.Item1, x => ((IDisposable)x).Dispose(), responseHeader.Item2);
}
private void ProduceEnd(Exception ex)
public void ProduceEnd(Exception ex)
{
ProduceStart();
if (!_keepAlive)
{
_context.ConnectionControl.End(ProduceEndType.SocketShutdownSend);
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
}
_messageBody.Drain(() =>
_context.ConnectionControl.End(_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect));
ConnectionControl.End(_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect));
}
private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
string status, IEnumerable<KeyValuePair<string, string[]>> headers)
{
var writer = new MemoryPoolTextWriter(_context.Memory);
var writer = new MemoryPoolTextWriter(Memory);
writer.Write(_httpVersion);
writer.Write(' ');
writer.Write(status);

View File

@ -63,11 +63,25 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
//NOTE todo
throw new NotImplementedException();
//var tcs = new TaskCompletionSource<int>(state);
//_body.ReadAsync(new ArraySegment<byte>(buffer, offset, count));
//return tcs.Task;
var tcs = new TaskCompletionSource<int>(state);
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count));
task.ContinueWith((t, x) =>
{
var tcs2 = (TaskCompletionSource<int>)x;
if (t.IsCanceled)
{
tcs2.SetCanceled();
}
else if (t.IsFaulted)
{
tcs2.SetException(t.Exception);
}
else
{
tcs2.SetResult(t.Result);
}
}, tcs);
return tcs.Task;
}
public override void Write(byte[] buffer, int offset, int count)

View File

@ -10,11 +10,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
class FrameResponseStream : Stream
{
readonly Action<ArraySegment<byte>, Action<object>, object> _write;
private readonly FrameContext _context;
public FrameResponseStream(Action<ArraySegment<byte>, Action<object>, object> write)
public FrameResponseStream(FrameContext context)
{
_write = write;
_context = context;
}
public override void Flush()
@ -25,7 +25,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override Task FlushAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_write(new ArraySegment<byte>(new byte[0]), x => ((TaskCompletionSource<int>)x).SetResult(0), tcs);
_context.FrameControl.Write(new ArraySegment<byte>(new byte[0]), x => ((TaskCompletionSource<int>)x).SetResult(0), tcs);
return tcs.Task;
}
@ -52,7 +52,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_write(new ArraySegment<byte>(buffer, offset, count), x => ((TaskCompletionSource<int>)x).SetResult(0), tcs);
_context.FrameControl.Write(new ArraySegment<byte>(buffer, offset, count), x => ((TaskCompletionSource<int>)x).SetResult(0), tcs);
return tcs.Task;
}

View File

@ -12,7 +12,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public bool RequestKeepAlive { get; protected set; }
protected MessageBody(ConnectionContext context) : base(context)
protected MessageBody(FrameContext context) : base(context)
{
}
@ -35,7 +35,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public static MessageBody For(
string httpVersion,
IDictionary<string, string[]> headers,
ConnectionContext context)
FrameContext context)
{
// see also http://tools.ietf.org/html/rfc2616#section-4.4
@ -99,7 +99,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
class ForRemainingData : MessageBody
{
public ForRemainingData(ConnectionContext context)
public ForRemainingData(FrameContext context)
: base(context)
{
}
@ -124,7 +124,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
private readonly int _contentLength;
private int _neededLength;
public ForContentLength(bool keepAlive, int contentLength, ConnectionContext context)
public ForContentLength(bool keepAlive, int contentLength, FrameContext context)
: base(context)
{
RequestKeepAlive = keepAlive;
@ -168,7 +168,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
};
public ForChunkedEncoding(bool keepAlive, ConnectionContext context)
public ForChunkedEncoding(bool keepAlive, FrameContext context)
: base(context)
{
RequestKeepAlive = keepAlive;

View File

@ -15,14 +15,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public class MessageBodyExchanger
{
private static readonly WaitCallback _completePending = CompletePending;
protected readonly ConnectionContext _context;
protected readonly FrameContext _context;
object _sync = new Object();
ArraySegment<byte> _buffer;
Queue<ReadOperation> _reads = new Queue<ReadOperation>();
bool _send100Continue = true;
public MessageBodyExchanger(ConnectionContext context)
public MessageBodyExchanger(FrameContext context)
{
_context = context;
_buffer = new ArraySegment<byte>(_context.Memory.Empty);
@ -32,9 +33,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public void Transfer(int count, bool fin)
{
if (count == 0 && !fin)
{
return;
}
var input = _context.SocketInput;
lock (_sync)
{
if (_send100Continue)
{
_send100Continue = false;
}
// NOTE: this should not copy each time
var oldBuffer = _buffer;
var newData = _context.SocketInput.Take(count);
@ -63,7 +73,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public Task<int> ReadAsync(ArraySegment<byte> buffer)
{
for (; ;)
Task<int> result = null;
var send100Continue = false;
while (result == null)
{
while (CompletePending())
{
@ -83,7 +95,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
var count = Math.Min(buffer.Count, _buffer.Count);
Array.Copy(_buffer.Array, _buffer.Offset, buffer.Array, buffer.Offset, count);
_buffer = new ArraySegment<byte>(_buffer.Array, _buffer.Offset + count, _buffer.Count - count);
return Task.FromResult(count);
result = Task.FromResult(count);
}
else
{
@ -94,10 +106,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
Buffer = buffer,
CompletionSource = tcs,
});
return tcs.Task;
result = tcs.Task;
send100Continue = _send100Continue;
_send100Continue = false;
}
}
}
if (send100Continue)
{
_context.FrameControl.ProduceContinue();
}
return result;
}
static void CompletePending(object state)

View File

@ -4,6 +4,7 @@
using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
@ -22,6 +23,7 @@ namespace Microsoft.AspNet.Server.Kestrel
Queue<Work> _workRunning = new Queue<Work>();
object _workSync = new Object();
bool _stopImmediate = false;
private ExceptionDispatchInfo _closeError;
public KestrelThread(KestrelEngine engine)
{
@ -51,6 +53,10 @@ namespace Microsoft.AspNet.Server.Kestrel
_thread.Abort();
}
}
if (_closeError != null)
{
_closeError.Throw();
}
}
private void OnStop(object obj)
@ -86,20 +92,27 @@ namespace Microsoft.AspNet.Server.Kestrel
{
tcs.SetException(ex);
}
var ran1 = _loop.Run();
if (_stopImmediate)
try
{
// thread-abort form of exit, resources will be leaked
return;
var ran1 = _loop.Run();
if (_stopImmediate)
{
// thread-abort form of exit, resources will be leaked
return;
}
// run the loop one more time to delete the _post handle
_post.Reference();
_post.DangerousClose();
var ran2 = _loop.Run();
// delete the last of the unmanaged memory
_loop.Close();
}
catch (Exception ex)
{
_closeError = ExceptionDispatchInfo.Capture(ex);
}
// run the loop one more time to delete the _post handle
_post.Reference();
_post.Close();
var ran2 = _loop.Run();
// delete the last of the unmanaged memory
_loop.Close();
}
private void OnPost()

View File

@ -23,6 +23,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
_uv.async_init(loop, this, _uv_async_cb);
}
public void DangerousClose()
{
Close();
ReleaseHandle();
}
private void UvAsyncCb(IntPtr handle)
{
_callback.Invoke();

View File

@ -34,20 +34,21 @@ namespace Microsoft.AspNet.Server.KestralTests
{
var request = callContext as IHttpRequestFeature;
var response = callContext as IHttpResponseFeature;
response.Headers["Transfer-Encoding"] = new[] { "chunked" };
var data = new MemoryStream();
for (; ;)
{
var buffer = new byte[8192];
var count = await request.Body.ReadAsync(buffer, 0, buffer.Length);
var hex = Encoding.ASCII.GetBytes(count.ToString("x") + "\r\n");
await response.Body.WriteAsync(hex, 0, hex.Length);
if (count == 0)
{
break;
}
await response.Body.WriteAsync(buffer, 0, count);
await response.Body.WriteAsync(new[] { (byte)'\r', (byte)'\n' }, 0, 2);
data.Write(buffer, 0, count);
}
var bytes = data.ToArray();
response.Headers["Content-Length"] = new[] { bytes.Length.ToString() };
await response.Body.WriteAsync(bytes, 0, bytes.Length);
}
[Fact]
@ -118,12 +119,12 @@ Hello World");
Transceive(
@"POST / HTTP/1.0
Content-Length: 5
Content-Length: 11
Hello World",
@"HTTP/1.0 200 OK
Hello");
Hello World");
started.Dispose();
engine.Stop();
}
@ -135,19 +136,21 @@ Hello");
engine.Start(1);
var started = engine.CreateServer(App);
Transceive(
@"POST / HTTP/1.0
Transfer-Encoding: chunked
using (var connection = new TestConnection())
{
await connection.Send(
"POST / HTTP/1.0",
"Transfer-Encoding: chunked",
"",
"5", "Hello", "6", " World", "0\r\n");
await connection.ReceiveEnd(
"HTTP/1.0 200 OK",
"",
"Hello World");
}
5
Hello
6
World
0
ignored",
@"HTTP/1.0 200 OK
Hello World");
started.Dispose();
engine.Stop();
}
@ -160,25 +163,27 @@ Hello World");
engine.Start(1);
var started = engine.CreateServer(AppChunked);
Transceive(
@"GET / HTTP/1.0
Connection: Keep-Alive
using (var connection = new TestConnection())
{
await connection.SendEnd(
"GET / HTTP/1.0",
"Connection: keep-alive",
"",
"POST / HTTP/1.0",
"",
"Goodbye");
await connection.Receive(
"HTTP/1.0 200 OK",
"Content-Length: 0",
"Connection: keep-alive",
"\r\n");
await connection.ReceiveEnd(
"HTTP/1.0 200 OK",
"Content-Length: 7",
"",
"Goodbye");
}
POST / HTTP/1.0
Goodbye",
@"HTTP/1.0 200 OK
Transfer-Encoding: chunked
Connection: keep-alive
0
HTTP/1.0 200 OK
Transfer-Encoding: chunked
7
Goodbye
0
");
started.Dispose();
engine.Stop();
}
@ -190,28 +195,28 @@ Goodbye
engine.Start(1);
var started = engine.CreateServer(AppChunked);
Transceive(
@"POST / HTTP/1.0
Connection: Keep-Alive
Content-Length: 11
Hello WorldPOST / HTTP/1.0
Goodbye",
@"HTTP/1.0 200 OK
Transfer-Encoding: chunked
Connection: keep-alive
b
Hello World
0
HTTP/1.0 200 OK
Transfer-Encoding: chunked
7
Goodbye
0
");
using (var connection = new TestConnection())
{
await connection.SendEnd(
"POST / HTTP/1.0",
"Connection: keep-alive",
"Content-Length: 11",
"",
"Hello WorldPOST / HTTP/1.0",
"",
"Goodbye");
await connection.Receive(
"HTTP/1.0 200 OK",
"Content-Length: 11",
"Connection: keep-alive",
"",
"Hello World");
await connection.ReceiveEnd(
"HTTP/1.0 200 OK",
"Content-Length: 7",
"",
"Goodbye");
}
started.Dispose();
engine.Stop();
}
@ -223,37 +228,52 @@ Goodbye
engine.Start(1);
var started = engine.CreateServer(AppChunked);
Transceive(
@"POST / HTTP/1.0
Transfer-Encoding: chunked
Connection: keep-alive
using (var connection = new TestConnection())
{
await connection.SendEnd(
"POST / HTTP/1.0",
"Transfer-Encoding: chunked",
"Connection: keep-alive",
"",
"5", "Hello", "6", " World", "0",
"POST / HTTP/1.0",
"",
"Goodbye");
await connection.Receive(
"HTTP/1.0 200 OK",
"Content-Length: 11",
"Connection: keep-alive",
"",
"Hello World");
await connection.ReceiveEnd(
"HTTP/1.0 200 OK",
"Content-Length: 7",
"",
"Goodbye");
}
5
Hello
6
World
0
POST / HTTP/1.0
Goodbye",
@"HTTP/1.0 200 OK
Transfer-Encoding: chunked
Connection: keep-alive
b
Hello World
0
HTTP/1.0 200 OK
Transfer-Encoding: chunked
7
Goodbye
0
");
started.Dispose();
engine.Stop();
}
[Fact(Skip = "This is still not working")]
public async Task Expect100ContinueForBody()
{
var engine = new KestrelEngine();
engine.Start(1);
var started = engine.CreateServer(AppChunked);
using (var connection = new TestConnection())
{
await connection.Send("POST / HTTP/1.1", "Expect: 100-continue", "Content-Length: 11", "\r\n");
await connection.Receive("HTTP/1.1 100 Continue", "\r\n");
await connection.SendEnd("Hello World");
await connection.ReceiveEnd("HTTP/1.1 200 OK", "Content-Length: 11", "", "Hello World");
}
started.Dispose();
engine.Stop();
}
private void Transceive(string send, string expected)
{
@ -261,11 +281,25 @@ Goodbye
socket.Connect(new IPEndPoint(IPAddress.Loopback, 4001));
var stream = new NetworkStream(socket, false);
var writer = new StreamWriter(stream, Encoding.ASCII);
writer.Write(send);
writer.Flush();
stream.Flush();
socket.Shutdown(SocketShutdown.Send);
Task.Run(async () =>
{
try
{
var writer = new StreamWriter(stream, Encoding.ASCII);
foreach (var ch in send)
{
await writer.WriteAsync(ch);
await writer.FlushAsync();
await Task.Delay(TimeSpan.FromMilliseconds(5));
}
writer.Flush();
stream.Flush();
socket.Shutdown(SocketShutdown.Send);
}
catch (Exception ex)
{
}
});
var reader = new StreamReader(stream, Encoding.ASCII);
var actual = reader.ReadToEnd();

View File

@ -18,7 +18,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
var exchanger = new MessageBodyExchanger(testInput.ConnectionContext);
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
@ -54,7 +54,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
var exchanger = new MessageBodyExchanger(testInput.ConnectionContext);
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
testInput.Add("Hello");
@ -84,7 +84,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
var exchanger = new MessageBodyExchanger(testInput.ConnectionContext);
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
@ -117,7 +117,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
var exchanger = new MessageBodyExchanger(testInput.ConnectionContext);
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
var buffer1 = new byte[1024];
var buffer2 = new byte[1024];
@ -155,7 +155,7 @@ namespace Microsoft.AspNet.Server.KestralTests
var context = new ConnectionContext();
context.SocketInput = new SocketInput(new MemoryPool());
var exchanger = new MessageBodyExchanger(testInput.ConnectionContext);
var exchanger = new MessageBodyExchanger(testInput.FrameContext);
testInput.Add("Hello");

View File

@ -16,7 +16,7 @@ namespace Microsoft.AspNet.Server.KestralTests
public async Task Http10ConnectionClose()
{
var input = new TestInput();
var body = MessageBody.For("HTTP/1.0", new Dictionary<string, string[]>(), input.ConnectionContext);
var body = MessageBody.For("HTTP/1.0", new Dictionary<string, string[]>(), input.FrameContext);
var stream = new FrameRequestStream(body);
input.Add("Hello", true);
@ -35,7 +35,7 @@ namespace Microsoft.AspNet.Server.KestralTests
public async Task Http10ConnectionCloseAsync()
{
var input = new TestInput();
var body = MessageBody.For("HTTP/1.0", new Dictionary<string, string[]>(), input.ConnectionContext);
var body = MessageBody.For("HTTP/1.0", new Dictionary<string, string[]>(), input.FrameContext);
var stream = new FrameRequestStream(body);
input.Add("Hello", true);

View File

@ -31,6 +31,7 @@
<Compile Include="MessageBodyExchangerTests.cs" />
<Compile Include="MessageBodyTests.cs" />
<Compile Include="NetworkingTests.cs" />
<Compile Include="TestConnection.cs" />
<Compile Include="TestInput.cs" />
</ItemGroup>
<Import Project="$(VSToolsPath)\AspNet\Microsoft.Web.AspNet.targets" Condition="'$(VSToolsPath)' != ''" />

View File

@ -0,0 +1,87 @@
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace Microsoft.AspNet.Server.KestralTests
{
/// <summary>
/// Summary description for TestConnection
/// </summary>
public class TestConnection : IDisposable
{
private Socket _socket;
private NetworkStream _stream;
private StreamReader _reader;
public TestConnection()
{
Create();
}
public void Create()
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_socket.Connect(new IPEndPoint(IPAddress.Loopback, 4001));
_stream = new NetworkStream(_socket, false);
_reader = new StreamReader(_stream, Encoding.ASCII);
}
public void Dispose()
{
_stream.Close();
_socket.Close();
}
public async Task Send(params string[] lines)
{
var text = String.Join("\r\n", lines);
var writer = new StreamWriter(_stream, Encoding.ASCII);
foreach (var ch in text)
{
await writer.WriteAsync(ch);
await writer.FlushAsync();
await Task.Delay(TimeSpan.FromMilliseconds(5));
}
writer.Flush();
_stream.Flush();
}
public async Task SendEnd(params string[] lines)
{
await Send(lines);
_socket.Shutdown(SocketShutdown.Send);
}
public async Task Receive(params string[] lines)
{
var expected = String.Join("\r\n", lines);
var actual = new char[expected.Length];
var offset = 0;
while (offset < expected.Length)
{
var task = _reader.ReadAsync(actual, offset, actual.Length - offset);
Assert.True(task.Wait(1000), "timeout");
var count = await task;
if (count == 0)
{
break;
}
offset += count;
}
Assert.Equal(expected, new String(actual, 0, offset));
}
public async Task ReceiveEnd(params string[] lines)
{
await Receive(lines);
var ch = new char[1];
var count = await _reader.ReadAsync(ch, 0, 1);
Assert.Equal(0, count);
}
}
}

View File

@ -3,32 +3,53 @@ using Microsoft.AspNet.Server.Kestrel.Http;
namespace Microsoft.AspNet.Server.KestralTests
{
class TestInput
class TestInput : IConnectionControl, IFrameControl
{
public TestInput()
{
var memory = new MemoryPool();
ConnectionContext = new ConnectionContext
FrameContext = new FrameContext
{
SocketInput = new SocketInput(memory),
Memory = memory,
ConnectionControl = this,
FrameControl = this
};
}
public ConnectionContext ConnectionContext { get; set; }
public FrameContext FrameContext { get; set; }
public void Add(string text, bool fin = false)
{
var encoding = System.Text.Encoding.ASCII;
var count = encoding.GetByteCount(text);
var buffer = ConnectionContext.SocketInput.Available(text.Length);
var buffer = FrameContext.SocketInput.Available(text.Length);
count = encoding.GetBytes(text, 0, text.Length, buffer.Array, buffer.Offset);
ConnectionContext.SocketInput.Extend(count);
FrameContext.SocketInput.Extend(count);
if (fin)
{
ConnectionContext.SocketInput.RemoteIntakeFin = true;
FrameContext.SocketInput.RemoteIntakeFin = true;
}
}
public void ProduceContinue()
{
}
public void Pause()
{
}
public void Resume()
{
}
public void Write(ArraySegment<byte> data, Action<object> callback, object state)
{
}
public void End(ProduceEndType endType)
{
}
}
}