diff --git a/KestrelHttpServer.sln b/KestrelHttpServer.sln index 9b0ccec28c..fbfe94da08 100644 --- a/KestrelHttpServer.sln +++ b/KestrelHttpServer.sln @@ -12,6 +12,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution global.json = global.json EndProjectSection EndProject +Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "SampleApp", "src\SampleApp\SampleApp.kproj", "{2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -26,6 +28,10 @@ Global {37F3BFB2-6454-49E5-9D7F-581BF755CCFE}.Debug|Any CPU.Build.0 = Debug|Any CPU {37F3BFB2-6454-49E5-9D7F-581BF755CCFE}.Release|Any CPU.ActiveCfg = Release|Any CPU {37F3BFB2-6454-49E5-9D7F-581BF755CCFE}.Release|Any CPU.Build.0 = Release|Any CPU + {2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/CallContext.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/CallContext.cs new file mode 100644 index 0000000000..39b11b7ac3 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/CallContext.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.HttpFeature; +using System; +using System.Collections.Generic; +using System.IO; + +namespace Microsoft.AspNet.Server.Kestrel +{ + /// + /// Summary description for CallContext + /// + public class CallContext : + IHttpRequestFeature, + IHttpResponseFeature + { + public CallContext() + { + ((IHttpResponseFeature)this).StatusCode = 200; + } + + Stream IHttpResponseFeature.Body { get; set; } + + Stream IHttpRequestFeature.Body { get; set; } + + IDictionary IHttpResponseFeature.Headers { get; set; } + + IDictionary IHttpRequestFeature.Headers { get; set; } + + string IHttpRequestFeature.Method { get; set; } + + string IHttpRequestFeature.Path { get; set; } + + string IHttpRequestFeature.PathBase { get; set; } + + string IHttpRequestFeature.Protocol { get; set; } + + string IHttpRequestFeature.QueryString { get; set; } + + string IHttpResponseFeature.ReasonPhrase { get; set; } + + string IHttpRequestFeature.Scheme { get; set; } + + int IHttpResponseFeature.StatusCode { get; set; } + + void IHttpResponseFeature.OnSendingHeaders(Action callback, object state) + { + } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs new file mode 100644 index 0000000000..89837db215 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -0,0 +1,159 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNet.Server.Kestrel.Networking; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public class ConnectionContext : ListenerContext + { + public ConnectionContext() + { + } + + public ConnectionContext(ListenerContext context) : base(context) + { + } + + public SocketInput SocketInput { get; set; } + public ISocketOutput SocketOutput { get; set; } + + public IConnectionControl ConnectionControl { get; set; } + } + + public interface IConnectionControl + { + void Pause(); + void Resume(); + void End(ProduceEndType endType); + } + + public class Connection : ConnectionContext, IConnectionControl + { + private static readonly Action _readCallback = ReadCallback; + private static readonly Func _allocCallback = AllocCallback; + + private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state) + { + return ((Connection)state).OnAlloc(handle, suggestedSize); + } + + private static void ReadCallback(UvStreamHandle handle, int nread, object state) + { + ((Connection)state).OnRead(handle, nread); + } + + + private readonly Func _app; + private readonly UvStreamHandle _socket; + + private Frame _frame; + + private Action _fault; + private Action _frameConsumeCallback; + private Action _receiveAsyncCompleted; + private Frame _receiveAsyncCompletedFrame; + + public Connection(ListenerContext context, UvStreamHandle socket) : base(context) + { + _socket = socket; + ConnectionControl = this; + } + + public void Start() + { + //_services.Trace.Event(TraceEventType.Start, TraceMessage.Connection); + + SocketInput = new SocketInput(Memory); + SocketOutput = new SocketOutput(Thread, _socket); + + _socket.ReadStart(_allocCallback, _readCallback, this); + + //_fault = ex => { Debug.WriteLine(ex.Message); }; + + //_frameConsumeCallback = (frame, error) => + //{ + // if (error != null) + // { + // _fault(error); + // } + // try + // { + // Go(false, frame); + // } + // catch (Exception ex) + // { + // _fault(ex); + // } + //}; + + //try + //{ + // //_socket.Blocking = false; + // //_socket.NoDelay = true; + // Go(true, null); + //} + //catch (Exception ex) + //{ + // _fault(ex); + //} + } + + private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize) + { + return new Libuv.uv_buf_t + { + memory = SocketInput.Pin(2048), + len = 2048 + }; + } + + private void OnRead(UvStreamHandle handle, int nread) + { + SocketInput.Unpin(nread); + + if (nread == 0) + { + SocketInput.RemoteIntakeFin = true; + } + + if (_frame == null) + { + _frame = new Frame(this); + } + _frame.Consume(); + } + + void IConnectionControl.Pause() + { + _socket.ReadStop(); + } + + void IConnectionControl.Resume() + { + _socket.ReadStart(_allocCallback, _readCallback, this); + } + + void IConnectionControl.End(ProduceEndType endType) + { + switch (endType) + { + case ProduceEndType.SocketShutdownSend: + var shutdown = new UvShutdownReq(); + shutdown.Init(Thread.Loop); + shutdown.Shutdown(_socket, (req, status, state) => req.Close(), null); + break; + case ProduceEndType.ConnectionKeepAlive: + break; + case ProduceEndType.SocketDisconnect: + _socket.Close(); + break; + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs new file mode 100644 index 0000000000..0697149351 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -0,0 +1,540 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.HttpFeature; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +// ReSharper disable AccessToModifiedClosure + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + + public enum ProduceEndType + { + SocketShutdownSend, + SocketDisconnect, + ConnectionKeepAlive, + } + + public class Frame + { + private ConnectionContext _context; + + Mode _mode; + + enum Mode + { + StartLine, + MessageHeader, + MessageBody, + Terminated, + } + + + private string _method; + private string _requestUri; + private string _path; + private string _queryString; + private string _httpVersion; + + private readonly IDictionary _requestHeaders = + new Dictionary(StringComparer.OrdinalIgnoreCase); + + readonly IDictionary _responseHeaders = + new Dictionary(StringComparer.OrdinalIgnoreCase); + + private MessageBody _messageBody; + private bool _resultStarted; + private bool _keepAlive; + + private CallContext _callContext; + /* + //IDictionary _environment; + + CancellationTokenSource _cts = new CancellationTokenSource(); + */ + FrameResponseStream _outputStream; + FrameRequestStream _inputStream; + FrameDuplexStream _duplexStream; + + Task _upgradeTask = _completedTask; + static readonly Task _completedTask = Task.FromResult(0); + + public Frame(ConnectionContext context) + { + _context = context; + } + /* + public bool LocalIntakeFin + { + get + { + return _mode == Mode.MessageBody + ? _messageBody.LocalIntakeFin + : _mode == Mode.Terminated; + } + } + */ + public void Consume() + { + var input = _context.SocketInput; + for (; ;) + { + switch (_mode) + { + case Mode.StartLine: + if (input.RemoteIntakeFin) + { + _mode = Mode.Terminated; + return; + } + + if (!TakeStartLine(input)) + { + return; + } + + _mode = Mode.MessageHeader; + break; + + case Mode.MessageHeader: + if (input.RemoteIntakeFin) + { + _mode = Mode.Terminated; + return; + } + + var endOfHeaders = false; + while (!endOfHeaders) + { + if (!TakeMessageHeader(input, out endOfHeaders)) + { + return; + } + } + + //var resumeBody = HandleExpectContinue(callback); + Execute(); + _mode = Mode.MessageBody; + break; + + case Mode.MessageBody: + _messageBody.Consume(); + // NOTE: keep looping? + return; + + case Mode.Terminated: + return; + } + } + } + + Action HandleExpectContinue(Action continuation) + { + string[] expect; + if (_httpVersion.Equals("HTTP/1.1") && + _requestHeaders.TryGetValue("Expect", out expect) && + (expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase)) + { + return (frame, error) => + { + if (_resultStarted) + { + continuation.Invoke(frame, error); + } + else + { + var bytes = Encoding.Default.GetBytes("HTTP/1.1 100 Continue\r\n\r\n"); + + //var isasync = _context.SocketOutput.Write( + // new ArraySegment(bytes), + // error2 => continuation(frame, error2)); + + //if (!isasync) + //{ + // continuation.Invoke(frame, null); + //} + } + }; + } + return continuation; + } + + private void Execute() + { + _messageBody = MessageBody.For( + _httpVersion, + _requestHeaders, + _context); + _keepAlive = _messageBody.RequestKeepAlive; + _callContext = CreateCallContext(); + _context.SocketInput.Free(); + Task.Run(ExecuteAsync); + } + + private async Task ExecuteAsync() + { + Exception error = null; + try + { + await _context.Application.Invoke(_callContext); + await _upgradeTask; + } + catch (Exception ex) + { + error = ex; + } + finally + { + ProduceEnd(error); + } + } + + private CallContext CreateCallContext() + { + _inputStream = new FrameRequestStream(_messageBody); + _outputStream = new FrameResponseStream(OnWrite); + _duplexStream = new FrameDuplexStream(_inputStream, _outputStream); + + var remoteIpAddress = "127.0.0.1"; + var remotePort = "0"; + var localIpAddress = "127.0.0.1"; + var localPort = "80"; + var isLocal = false; + + //if (_context.Socket != null) + //{ + // var remoteEndPoint = _context.Socket.RemoteEndPoint as IPEndPoint; + // if (remoteEndPoint != null) + // { + // remoteIpAddress = remoteEndPoint.Address.ToString(); + // remotePort = remoteEndPoint.Port.ToString(CultureInfo.InvariantCulture); + // } + + // var localEndPoint = _context.Socket.LocalEndPoint as IPEndPoint; + // if (localEndPoint != null) + // { + // localIpAddress = localEndPoint.Address.ToString(); + // localPort = localEndPoint.Port.ToString(CultureInfo.InvariantCulture); + // } + + // if (remoteEndPoint != null && localEndPoint != null) + // { + // isLocal = Equals(remoteEndPoint.Address, localEndPoint.Address); + // } + //} + + var callContext = new CallContext(); + var request = (IHttpRequestFeature)callContext; + var response = (IHttpResponseFeature)callContext; + //var lifetime = (IHttpRequestLifetimeFeature)callContext; + request.Protocol = _httpVersion; + request.Scheme = "http"; + request.Method = _method; + request.Path = _path; + request.PathBase = ""; + request.QueryString = _queryString; + request.Headers = _requestHeaders; + request.Body = _inputStream; + response.Headers = _responseHeaders; + response.Body = _outputStream; + + //var env = new Dictionary(); + //env["owin.Version"] = "1.0"; + //env["owin.RequestProtocol"] = _httpVersion; + //env["owin.RequestScheme"] = "http"; + //env["owin.RequestMethod"] = _method; + //env["owin.RequestPath"] = _path; + //env["owin.RequestPathBase"] = ""; + //env["owin.RequestQueryString"] = _queryString; + //env["owin.RequestHeaders"] = _requestHeaders; + //env["owin.RequestBody"] = _inputStream; + //env["owin.ResponseHeaders"] = _responseHeaders; + //env["owin.ResponseBody"] = _outputStream; + //env["owin.CallCancelled"] = _cts.Token; + //env["opaque.Upgrade"] = (Action, Func, Task>>)Upgrade; + //env["opaque.Stream"] = _duplexStream; + //env["server.RemoteIpAddress"] = remoteIpAddress; + //env["server.RemotePort"] = remotePort; + //env["server.LocalIpAddress"] = localIpAddress; + //env["server.LocalPort"] = localPort; + //env["server.IsLocal"] = isLocal; + return callContext; + } + + void OnWrite(ArraySegment data, Action callback, object state) + { + ProduceStart(); + _context.SocketOutput.Write(data, callback, state); + } + + void Upgrade(IDictionary options, Func callback) + { + _keepAlive = false; + ProduceStart(); + + _upgradeTask = callback(_callContext); + } + + void ProduceStart() + { + if (_resultStarted) return; + + _resultStarted = true; + + var response = (IHttpResponseFeature)_callContext; + var status = ReasonPhrases.ToStatus( + response.StatusCode, + response.ReasonPhrase); + + var responseHeader = CreateResponseHeader(status, _responseHeaders); + _context.SocketOutput.Write(responseHeader.Item1, x => ((IDisposable)x).Dispose(), responseHeader.Item2); + } + + private void ProduceEnd(Exception ex) + { + ProduceStart(); + + if (!_keepAlive) + { + _context.ConnectionControl.End(ProduceEndType.SocketShutdownSend); + } + + _messageBody.Drain(() => + _context.ConnectionControl.End(_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect)); + } + + + private Tuple, IDisposable> CreateResponseHeader( + string status, IEnumerable> headers) + { + var writer = new MemoryPoolTextWriter(_context.Memory); + writer.Write(_httpVersion); + writer.Write(' '); + writer.Write(status); + writer.Write('\r'); + writer.Write('\n'); + + var hasConnection = false; + var hasTransferEncoding = false; + var hasContentLength = false; + if (headers != null) + { + foreach (var header in headers) + { + var isConnection = false; + if (!hasConnection && + string.Equals(header.Key, "Connection", StringComparison.OrdinalIgnoreCase)) + { + hasConnection = isConnection = true; + } + else if (!hasTransferEncoding && + string.Equals(header.Key, "Transfer-Encoding", StringComparison.OrdinalIgnoreCase)) + { + hasTransferEncoding = true; + } + else if (!hasContentLength && + string.Equals(header.Key, "Content-Length", StringComparison.OrdinalIgnoreCase)) + { + hasContentLength = true; + } + + foreach (var value in header.Value) + { + writer.Write(header.Key); + writer.Write(':'); + writer.Write(' '); + writer.Write(value); + writer.Write('\r'); + writer.Write('\n'); + + if (isConnection && value.IndexOf("close", StringComparison.OrdinalIgnoreCase) != -1) + { + _keepAlive = false; + } + } + } + } + + if (hasTransferEncoding == false && hasContentLength == false) + { + _keepAlive = false; + } + if (_keepAlive == false && hasConnection == false && _httpVersion == "HTTP/1.1") + { + writer.Write("Connection: close\r\n\r\n"); + } + else if (_keepAlive && hasConnection == false && _httpVersion == "HTTP/1.0") + { + writer.Write("Connection: keep-alive\r\n\r\n"); + } + else + { + writer.Write('\r'); + writer.Write('\n'); + } + writer.Flush(); + return new Tuple, IDisposable>(writer.Buffer, writer); + } + + private bool TakeStartLine(SocketInput baton) + { + var remaining = baton.Buffer; + if (remaining.Count < 2) + { + return false; + } + var firstSpace = -1; + var secondSpace = -1; + var questionMark = -1; + var ch0 = remaining.Array[remaining.Offset]; + for (var index = 0; index != remaining.Count - 1; ++index) + { + var ch1 = remaining.Array[remaining.Offset + index + 1]; + if (ch0 == '\r' && ch1 == '\n') + { + if (secondSpace == -1) + { + throw new InvalidOperationException("INVALID REQUEST FORMAT"); + } + _method = GetString(remaining, 0, firstSpace); + _requestUri = GetString(remaining, firstSpace + 1, secondSpace); + if (questionMark == -1) + { + _path = _requestUri; + _queryString = string.Empty; + } + else + { + _path = GetString(remaining, firstSpace + 1, questionMark); + _queryString = GetString(remaining, questionMark, secondSpace); + } + _httpVersion = GetString(remaining, secondSpace + 1, index); + baton.Skip(index + 2); + return true; + } + + if (ch0 == ' ' && firstSpace == -1) + { + firstSpace = index; + } + else if (ch0 == ' ' && firstSpace != -1 && secondSpace == -1) + { + secondSpace = index; + } + else if (ch0 == '?' && firstSpace != -1 && questionMark == -1 && secondSpace == -1) + { + questionMark = index; + } + ch0 = ch1; + } + return false; + } + + static string GetString(ArraySegment range, int startIndex, int endIndex) + { + return Encoding.Default.GetString(range.Array, range.Offset + startIndex, endIndex - startIndex); + } + + + private bool TakeMessageHeader(SocketInput baton, out bool endOfHeaders) + { + var remaining = baton.Buffer; + endOfHeaders = false; + if (remaining.Count < 2) + { + return false; + } + var ch0 = remaining.Array[remaining.Offset]; + var ch1 = remaining.Array[remaining.Offset + 1]; + if (ch0 == '\r' && ch1 == '\n') + { + endOfHeaders = true; + baton.Skip(2); + return true; + } + + if (remaining.Count < 3) + { + return false; + } + var wrappedHeaders = false; + var colonIndex = -1; + var valueStartIndex = -1; + var valueEndIndex = -1; + for (var index = 0; index != remaining.Count - 2; ++index) + { + var ch2 = remaining.Array[remaining.Offset + index + 2]; + if (ch0 == '\r' && + ch1 == '\n' && + ch2 != ' ' && + ch2 != '\t') + { + var name = Encoding.ASCII.GetString(remaining.Array, remaining.Offset, colonIndex); + var value = ""; + if (valueEndIndex != -1) + { + value = Encoding.ASCII.GetString( + remaining.Array, remaining.Offset + valueStartIndex, valueEndIndex - valueStartIndex); + } + if (wrappedHeaders) + { + value = value.Replace("\r\n", " "); + } + AddRequestHeader(name, value); + baton.Skip(index + 2); + return true; + } + if (colonIndex == -1 && ch0 == ':') + { + colonIndex = index; + } + else if (colonIndex != -1 && + ch0 != ' ' && + ch0 != '\t' && + ch0 != '\r' && + ch0 != '\n') + { + if (valueStartIndex == -1) + { + valueStartIndex = index; + } + valueEndIndex = index + 1; + } + else if (!wrappedHeaders && + ch0 == '\r' && + ch1 == '\n' && + (ch2 == ' ' || + ch2 == '\t')) + { + wrappedHeaders = true; + } + + ch0 = ch1; + ch1 = ch2; + } + return false; + } + + private void AddRequestHeader(string name, string value) + { + string[] existing; + if (!_requestHeaders.TryGetValue(name, out existing) || + existing == null || + existing.Length == 0) + { + _requestHeaders[name] = new[] { value }; + } + else + { + _requestHeaders[name] = existing.Concat(new[] { value }).ToArray(); + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/FrameDuplexStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameDuplexStream.cs new file mode 100644 index 0000000000..7028eacdb4 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameDuplexStream.cs @@ -0,0 +1,166 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + class FrameDuplexStream : Stream + { + readonly FrameRequestStream _requestStream; + readonly FrameResponseStream _responseStream; + + public FrameDuplexStream(FrameRequestStream requestStream, FrameResponseStream responseStream) + { + _requestStream = requestStream; + _responseStream = responseStream; + } + + public override void Close() + { + _requestStream.Close(); + _responseStream.Close(); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _requestStream.Dispose(); + _responseStream.Dispose(); + } + } + + public override void Flush() + { + _responseStream.Flush(); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _requestStream.BeginRead(buffer, offset, count, callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return _requestStream.EndRead(asyncResult); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _responseStream.BeginWrite(buffer, offset, count, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + _responseStream.EndWrite(asyncResult); + } + + public override long Seek(long offset, SeekOrigin origin) + { + return _requestStream.Seek(offset, origin); + } + + public override void SetLength(long value) + { + _requestStream.SetLength(value); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _requestStream.Read(buffer, offset, count); + } + + public override int ReadByte() + { + return _requestStream.ReadByte(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + _responseStream.Write(buffer, offset, count); + } + + public override void WriteByte(byte value) + { + _responseStream.WriteByte(value); + } + + public override bool CanRead + { + get + { + return _requestStream.CanRead; + } + } + + public override bool CanSeek + { + get + { + return _requestStream.CanSeek; + } + } + + public override bool CanTimeout + { + get + { + return _responseStream.CanTimeout || _requestStream.CanTimeout; + } + } + + public override bool CanWrite + { + get + { + return _responseStream.CanWrite; + } + } + + public override long Length + { + get + { + return _requestStream.Length; + } + } + + public override long Position + { + get + { + return _requestStream.Position; + } + set + { + _requestStream.Position = value; + } + } + + public override int ReadTimeout + { + get + { + return _requestStream.ReadTimeout; + } + set + { + _requestStream.ReadTimeout = value; + } + } + + public override int WriteTimeout + { + get + { + return _responseStream.WriteTimeout; + } + set + { + _responseStream.WriteTimeout = value; + } + } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs new file mode 100644 index 0000000000..16a4c90920 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public class FrameRequestStream : Stream + { + readonly MessageBody _body; + + //int _readLength; + //bool _readFin; + //Exception _readError; + + public FrameRequestStream(MessageBody body) + { + _body = body; + } + + public override void Flush() + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count).Result; + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + var task = ReadAsync(buffer, offset, count, CancellationToken.None, state); + if (callback != null) + { + task.ContinueWith(t => callback.Invoke(t)); + } + return task; + } + + public override int EndRead(IAsyncResult asyncResult) + { + return ((Task)asyncResult).Result; + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _body.ReadAsync(new ArraySegment(buffer, offset, count)); + } + + public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state) + { + //NOTE todo + throw new NotImplementedException(); + //var tcs = new TaskCompletionSource(state); + //_body.ReadAsync(new ArraySegment(buffer, offset, count)); + //return tcs.Task; + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override bool CanRead { get { return true; } } + + public override bool CanSeek { get { return false; } } + + public override bool CanWrite { get { return false; } } + + public override long Length + { + get + { + throw new NotImplementedException(); + } + } + + public override long Position { get; set; } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/FrameResponseStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameResponseStream.cs new file mode 100644 index 0000000000..d4b34fbce6 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/FrameResponseStream.cs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + class FrameResponseStream : Stream + { + readonly Action, Action, object> _write; + + public FrameResponseStream(Action, Action, object> write) + { + _write = write; + } + + public override void Flush() + { + //_write(default(ArraySegment), null); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + var tcs = new TaskCompletionSource(); + _write(new ArraySegment(new byte[0]), x => ((TaskCompletionSource)x).SetResult(0), tcs); + return tcs.Task; + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var tcs = new TaskCompletionSource(); + _write(new ArraySegment(buffer, offset, count), x => ((TaskCompletionSource)x).SetResult(0), tcs); + return tcs.Task; + } + + public override bool CanRead + { + get + { + return false; + } + } + + public override bool CanSeek + { + get + { + return false; + } + } + + public override bool CanWrite + { + get + { + return true; + } + } + + public override long Length + { + get + { + throw new NotImplementedException(); + } + } + + public override long Position { get; set; } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs index 720db481fe..95995f9100 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs @@ -1,36 +1,58 @@ -using Microsoft.AspNet.Server.Kestrel.Networking; +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.Server.Kestrel.Networking; using System; -using System.Collections.Generic; using System.Net; using System.Text; -using System.Threading; using System.Threading.Tasks; -namespace Microsoft.AspNet.Server.Kestrel +namespace Microsoft.AspNet.Server.Kestrel.Http { + public class ListenerContext + { + public ListenerContext() { } + + public ListenerContext(ListenerContext context) + { + Thread = context.Thread; + Application = context.Application; + Memory = context.Memory; + } + + public KestrelThread Thread { get; set; } + + public Func Application { get; set; } + + public IMemoryPool Memory { get; set; } + } + /// /// Summary description for Accept /// - public class Listener : IDisposable + public class Listener : ListenerContext, IDisposable { - private readonly KestrelThread _thread; - UvTcpHandle _socket; - private readonly Action _connectionCallback = ConnectionCallback; + private static readonly Action _connectionCallback = ConnectionCallback; + + UvTcpHandle ListenSocket { get; set; } private static void ConnectionCallback(UvStreamHandle stream, int status, object state) { ((Listener)state).OnConnection(stream, status); } - public Listener(KestrelThread thread) + public Listener(IMemoryPool memory) { - _thread = thread; + Memory = memory; } - public Task StartAsync() + public Task StartAsync(KestrelThread thread, Func app) { + Thread = thread; + Application = app; + var tcs = new TaskCompletionSource(); - _thread.Post(OnStart, tcs); + Thread.Post(OnStart, tcs); return tcs.Task; } @@ -39,10 +61,10 @@ namespace Microsoft.AspNet.Server.Kestrel var tcs = (TaskCompletionSource)parameter; try { - _socket = new UvTcpHandle(); - _socket.Init(_thread.Loop); - _socket.Bind(new IPEndPoint(IPAddress.Any, 4001)); - _socket.Listen(10, _connectionCallback, this); + ListenSocket = new UvTcpHandle(); + ListenSocket.Init(Thread.Loop); + ListenSocket.Bind(new IPEndPoint(IPAddress.Any, 4001)); + ListenSocket.Listen(10, _connectionCallback, this); tcs.SetResult(0); } catch (Exception ex) @@ -51,33 +73,25 @@ namespace Microsoft.AspNet.Server.Kestrel } } - private void OnConnection(UvStreamHandle socket, int status) + private void OnConnection(UvStreamHandle listenSocket, int status) { - var connection = new UvTcpHandle(); - connection.Init(_thread.Loop); - socket.Accept(connection); - connection.ReadStart(OnRead, null); - } + var acceptSocket = new UvTcpHandle(); + acceptSocket.Init(Thread.Loop); + listenSocket.Accept(acceptSocket); - private void OnRead(UvStreamHandle socket, int count, byte[] data, object _) - { - var text = Encoding.UTF8.GetString(data); - if (count <= 0) - { - socket.Close(); - } + var connection = new Connection(this, acceptSocket); + connection.Start(); } public void Dispose() { - var socket = _socket; - _socket = null; - _thread.Post(OnDispose, socket); + Thread.Post(OnDispose, ListenSocket); + ListenSocket = null; } - private void OnDispose(object socket) + private void OnDispose(object listenSocket) { - ((UvHandle)socket).Close(); + ((UvHandle)listenSocket).Close(); } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs new file mode 100644 index 0000000000..f97675f776 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Net.Sockets; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public interface IMemoryPool + { + byte[] Empty { get; } + + byte[] AllocByte(int minimumSize); + void FreeByte(byte[] memory); + + char[] AllocChar(int minimumSize); + void FreeChar(char[] memory); + + /// + /// Acquires a sub-segment of a larger memory allocation. Used for async sends of write-behind + /// buffers to reduce number of array segments pinned + /// + /// The smallest length of the ArraySegment.Count that may be returned + /// An array segment which is a sub-block of a larger allocation + ArraySegment AllocSegment(int minimumSize); + + /// + /// Frees a sub-segment of a larger memory allocation produced by AllocSegment. The original ArraySegment + /// must be frees exactly once and must have the same offset and count that was returned by the Alloc. + /// If a segment is not freed it won't be re-used and has the same effect as a memory leak, so callers must be + /// implemented exactly correctly. + /// + /// The sub-block that was originally returned by a call to AllocSegment. + void FreeSegment(ArraySegment segment); + } + + public class MemoryPool : IMemoryPool + { + static readonly byte[] EmptyArray = new byte[0]; + + class Pool + { + readonly Stack _stack = new Stack(); + readonly object _sync = new object(); + + public T[] Alloc(int size) + { + lock (_sync) + { + if (_stack.Count != 0) + { + return _stack.Pop(); + } + } + return new T[size]; + } + + public void Free(T[] value, int limit) + { + lock (_sync) + { + if (_stack.Count < limit) + { + _stack.Push(value); + } + } + } + } + + readonly Pool _pool1 = new Pool(); + readonly Pool _pool2 = new Pool(); + readonly Pool _pool3 = new Pool(); + + public byte[] Empty + { + get + { + return EmptyArray; + } + } + + public byte[] AllocByte(int minimumSize) + { + if (minimumSize == 0) + { + return EmptyArray; + } + if (minimumSize <= 1024) + { + return _pool1.Alloc(1024); + } + if (minimumSize <= 2048) + { + return _pool2.Alloc(2048); + } + return new byte[minimumSize]; + } + + public void FreeByte(byte[] memory) + { + if (memory == null) + { + return; + } + switch (memory.Length) + { + case 1024: + _pool1.Free(memory, 256); + break; + case 2048: + _pool2.Free(memory, 64); + break; + } + } + + public char[] AllocChar(int minimumSize) + { + if (minimumSize == 0) + { + return new char[0]; + } + if (minimumSize <= 128) + { + return _pool3.Alloc(128); + } + return new char[minimumSize]; + } + + public void FreeChar(char[] memory) + { + if (memory == null) + { + return; + } + switch (memory.Length) + { + case 128: + _pool3.Free(memory, 256); + break; + } + } + + public ArraySegment AllocSegment(int minimumSize) + { + return new ArraySegment(AllocByte(minimumSize)); + } + + public void FreeSegment(ArraySegment segment) + { + FreeByte(segment.Array); + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs new file mode 100644 index 0000000000..c094178846 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs @@ -0,0 +1,155 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO; +using System.Text; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public class MemoryPoolTextWriter : TextWriter + { + private readonly IMemoryPool _memory; + + private char[] _textArray; + private int _textBegin; + private int _textEnd; + // ReSharper disable InconsistentNaming + private const int _textLength = 128; + // ReSharper restore InconsistentNaming + + private byte[] _dataArray; + private int _dataEnd; + + private readonly Encoder _encoder; + + public ArraySegment Buffer + { + get + { + return new ArraySegment(_dataArray, 0, _dataEnd); + } + } + + public MemoryPoolTextWriter(IMemoryPool memory) + { + _memory = memory; + _textArray = _memory.AllocChar(_textLength); + _dataArray = _memory.Empty; + _encoder = Encoding.Default.GetEncoder(); + } + + public override Encoding Encoding + { + get + { + return Encoding.Default; + } + } + + protected override void Dispose(bool disposing) + { + try + { + if (disposing) + { + if (_textArray != null) + { + _memory.FreeChar(_textArray); + _textArray = null; + } + if (_dataArray != null) + { + _memory.FreeByte(_dataArray); + _dataArray = null; + } + } + } + finally + { + base.Dispose(disposing); + } + } + + private void Encode(bool flush) + { + var bytesNeeded = _encoder.GetByteCount( + _textArray, + _textBegin, + _textEnd - _textBegin, + flush); + + Grow(bytesNeeded); + + var bytesUsed = _encoder.GetBytes( + _textArray, + _textBegin, + _textEnd - _textBegin, + _dataArray, + _dataEnd, + flush); + + _textBegin = _textEnd = 0; + _dataEnd += bytesUsed; + } + + private void Grow(int minimumAvailable) + { + if (_dataArray.Length - _dataEnd >= minimumAvailable) + { + return; + } + + var newLength = _dataArray.Length + Math.Max(_dataArray.Length, minimumAvailable); + var newArray = _memory.AllocByte(newLength); + Array.Copy(_dataArray, 0, newArray, 0, _dataEnd); + _memory.FreeByte(_dataArray); + _dataArray = newArray; + } + + public override void Write(char value) + { + if (_textLength == _textEnd) + { + Encode(false); + if (_textLength == _textEnd) + { + throw new InvalidOperationException("Unexplainable failure to encode text"); + } + } + + _textArray[_textEnd++] = value; + } + + public override void Write(string value) + { + var sourceIndex = 0; + var sourceLength = value.Length; + while (sourceIndex < sourceLength) + { + if (_textLength == _textEnd) + { + Encode(false); + } + + var count = sourceLength - sourceIndex; + if (count > _textLength - _textEnd) + { + count = _textLength - _textEnd; + } + + value.CopyTo(sourceIndex, _textArray, _textEnd, count); + sourceIndex += count; + _textEnd += count; + } + } + + public override void Flush() + { + while (_textBegin != _textEnd) + { + Encode(true); + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs new file mode 100644 index 0000000000..0ade9006fb --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -0,0 +1,342 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public static class DelegateExtensions + { + public static void InvokeNoThrow(this Action d) + { + try + { d.Invoke(); } + catch + { } + } + public static void InvokeNoThrow(this Action d, T arg1) + { + try + { d.Invoke(arg1); } + catch + { } + } + } + + public abstract class MessageBody : MessageBodyExchanger + { + private Action _continuation = () => { }; + + public bool RequestKeepAlive { get; protected set; } + + protected MessageBody(ConnectionContext context) : base(context) + { + } + + public void Intake(int count) + { + Transfer(count, false); + } + + public void IntakeFin(int count) + { + Transfer(count, true); + if (_continuation != null) + { + _continuation.Invoke(); + } + } + + public abstract void Consume(); + + public static MessageBody For( + string httpVersion, + IDictionary headers, + ConnectionContext context) + { + // see also http://tools.ietf.org/html/rfc2616#section-4.4 + + var keepAlive = httpVersion != "HTTP/1.0"; + + string connection; + if (TryGet(headers, "Connection", out connection)) + { + keepAlive = connection.Equals("keep-alive", StringComparison.OrdinalIgnoreCase); + } + + string transferEncoding; + if (TryGet(headers, "Transfer-Encoding", out transferEncoding)) + { + return new ForChunkedEncoding(keepAlive, context); + } + + string contentLength; + if (TryGet(headers, "Content-Length", out contentLength)) + { + return new ForContentLength(keepAlive, int.Parse(contentLength), context); + } + + if (keepAlive) + { + return new ForContentLength(true, 0, context); + } + + return new ForRemainingData(context); + } + + public static bool TryGet(IDictionary headers, string name, out string value) + { + string[] values; + if (!headers.TryGetValue(name, out values) || values == null) + { + value = null; + return false; + } + var count = values.Length; + if (count == 0) + { + value = null; + return false; + } + if (count == 1) + { + value = values[0]; + return true; + } + value = String.Join(",", values); + return true; + } + + public void Drain(Action continuation) + { + _continuation = continuation; + _continuation.Invoke(); + } + + + class ForRemainingData : MessageBody + { + public ForRemainingData(ConnectionContext context) + : base(context) + { + } + + public override void Consume() + { + var input = _context.SocketInput; + + if (input.RemoteIntakeFin) + { + IntakeFin(input.Buffer.Count); + } + else + { + Intake(input.Buffer.Count); + } + } + } + + class ForContentLength : MessageBody + { + private readonly int _contentLength; + private int _neededLength; + + public ForContentLength(bool keepAlive, int contentLength, ConnectionContext context) + : base(context) + { + RequestKeepAlive = keepAlive; + _contentLength = contentLength; + _neededLength = _contentLength; + } + + public override void Consume() + { + var input = _context.SocketInput; + var consumeLength = Math.Min(_neededLength, input.Buffer.Count); + _neededLength -= consumeLength; + + var consumed = input.Take(consumeLength); + + if (_neededLength != 0) + { + Intake(consumeLength); + } + else + { + IntakeFin(consumeLength); + } + } + } + + + /// + /// http://tools.ietf.org/html/rfc2616#section-3.6.1 + /// + class ForChunkedEncoding : MessageBody + { + private int _neededLength; + + private Mode _mode = Mode.ChunkSizeLine; + + private enum Mode + { + ChunkSizeLine, + ChunkData, + ChunkDataCRLF, + Complete, + }; + + + public ForChunkedEncoding(bool keepAlive, ConnectionContext context) + : base(context) + { + RequestKeepAlive = keepAlive; + } + + public override void Consume() + { + var input = _context.SocketInput; + for (; ;) + { + switch (_mode) + { + case Mode.ChunkSizeLine: + var chunkSize = 0; + if (!TakeChunkedLine(input, ref chunkSize)) + { + return; + } + + _neededLength = chunkSize; + if (chunkSize == 0) + { + _mode = Mode.Complete; + IntakeFin(0); + return; + } + _mode = Mode.ChunkData; + break; + + case Mode.ChunkData: + if (_neededLength == 0) + { + _mode = Mode.ChunkDataCRLF; + break; + } + if (input.Buffer.Count == 0) + { + return; + } + + var consumeLength = Math.Min(_neededLength, input.Buffer.Count); + _neededLength -= consumeLength; + + Intake(consumeLength); + break; + + case Mode.ChunkDataCRLF: + if (input.Buffer.Count < 2) + { + return; + } + var crlf = input.Take(2); + if (crlf.Array[crlf.Offset] != '\r' || + crlf.Array[crlf.Offset + 1] != '\n') + { + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } + _mode = Mode.ChunkSizeLine; + break; + + default: + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } + } + } + + private static bool TakeChunkedLine(SocketInput baton, ref int chunkSizeOut) + { + var remaining = baton.Buffer; + if (remaining.Count < 2) + { + return false; + } + var ch0 = remaining.Array[remaining.Offset]; + var chunkSize = 0; + var mode = 0; + for (var index = 0; index != remaining.Count - 1; ++index) + { + var ch1 = remaining.Array[remaining.Offset + index + 1]; + + if (mode == 0) + { + if (ch0 >= '0' && ch0 <= '9') + { + chunkSize = chunkSize * 0x10 + (ch0 - '0'); + } + else if (ch0 >= 'A' && ch0 <= 'F') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10)); + } + else if (ch0 >= 'a' && ch0 <= 'f') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10)); + } + else + { + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } + mode = 1; + } + else if (mode == 1) + { + if (ch0 >= '0' && ch0 <= '9') + { + chunkSize = chunkSize * 0x10 + (ch0 - '0'); + } + else if (ch0 >= 'A' && ch0 <= 'F') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('A' - 10)); + } + else if (ch0 >= 'a' && ch0 <= 'f') + { + chunkSize = chunkSize * 0x10 + (ch0 - ('a' - 10)); + } + else if (ch0 == ';') + { + mode = 2; + } + else if (ch0 == '\r' && ch1 == '\n') + { + baton.Skip(index + 2); + chunkSizeOut = chunkSize; + return true; + } + else + { + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } + } + else if (mode == 2) + { + if (ch0 == '\r' && ch1 == '\n') + { + baton.Skip(index + 2); + chunkSizeOut = chunkSize; + return true; + } + else + { + // chunk-extensions not currently parsed + } + } + + ch0 = ch1; + } + return false; + } + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBodyExchanger.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBodyExchanger.cs new file mode 100644 index 0000000000..29837ad131 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBodyExchanger.cs @@ -0,0 +1,144 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + /// + /// Summary description for MessageBodyExchanger + /// + public class MessageBodyExchanger + { + private static readonly WaitCallback _completePending = CompletePending; + protected readonly ConnectionContext _context; + + object _sync = new Object(); + + ArraySegment _buffer; + Queue _reads = new Queue(); + + public MessageBodyExchanger(ConnectionContext context) + { + _context = context; + _buffer = new ArraySegment(_context.Memory.Empty); + } + + public bool LocalIntakeFin { get; set; } + + public void Transfer(int count, bool fin) + { + var input = _context.SocketInput; + lock (_sync) + { + // NOTE: this should not copy each time + var oldBuffer = _buffer; + var newData = _context.SocketInput.Take(count); + + var newBuffer = new ArraySegment( + _context.Memory.AllocByte(oldBuffer.Count + newData.Count), + 0, + oldBuffer.Count + newData.Count); + + Array.Copy(oldBuffer.Array, oldBuffer.Offset, newBuffer.Array, newBuffer.Offset, oldBuffer.Count); + Array.Copy(newData.Array, newData.Offset, newBuffer.Array, newBuffer.Offset + oldBuffer.Count, newData.Count); + + _buffer = newBuffer; + _context.Memory.FreeByte(oldBuffer.Array); + + if (fin) + { + LocalIntakeFin = true; + } + if (_reads.Any()) + { + ThreadPool.QueueUserWorkItem(_completePending, this); + } + } + } + + public Task ReadAsync(ArraySegment buffer) + { + for (; ;) + { + while (CompletePending()) + { + // earlier reads have priority + } + lock (_sync) + { + if (_buffer.Count != 0 || buffer.Count == 0 || LocalIntakeFin) + { + // there is data we can take right now + if (_reads.Any()) + { + // someone snuck in, try again + continue; + } + + var count = Math.Min(buffer.Count, _buffer.Count); + Array.Copy(_buffer.Array, _buffer.Offset, buffer.Array, buffer.Offset, count); + _buffer = new ArraySegment(_buffer.Array, _buffer.Offset + count, _buffer.Count - count); + return Task.FromResult(count); + } + else + { + // add ourselves to the line + var tcs = new TaskCompletionSource(); + _reads.Enqueue(new ReadOperation + { + Buffer = buffer, + CompletionSource = tcs, + }); + return tcs.Task; + } + } + } + } + + static void CompletePending(object state) + { + while (((MessageBodyExchanger)state).CompletePending()) + { + // loop until none left + } + } + + bool CompletePending() + { + ReadOperation read; + int count; + lock (_sync) + { + if (_buffer.Count == 0 && !LocalIntakeFin) + { + return false; + } + if (!_reads.Any()) + { + return false; + } + read = _reads.Dequeue(); + + count = Math.Min(read.Buffer.Count, _buffer.Count); + Array.Copy(_buffer.Array, _buffer.Offset, read.Buffer.Array, read.Buffer.Offset, count); + _buffer = new ArraySegment(_buffer.Array, _buffer.Offset + count, _buffer.Count - count); + } + if (read.CompletionSource != null) + { + read.CompletionSource.SetResult(count); + } + return true; + } + + public struct ReadOperation + { + public TaskCompletionSource CompletionSource; + public ArraySegment Buffer; + } + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ReasonPhrases.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ReasonPhrases.cs new file mode 100644 index 0000000000..33076d1505 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ReasonPhrases.cs @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Globalization; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public static class ReasonPhrases + { + public static string ToStatus(int statusCode, string reasonPhrase = null) + { + if (string.IsNullOrEmpty(reasonPhrase)) + { + reasonPhrase = ToReasonPhrase(statusCode); + } + return statusCode.ToString(CultureInfo.InvariantCulture) + " " + reasonPhrase; + } + + public static string ToReasonPhrase(int statusCode) + { + switch (statusCode) + { + case 100: + return "Continue"; + case 101: + return "Switching Protocols"; + case 102: + return "Processing"; + case 200: + return "OK"; + case 201: + return "Created"; + case 202: + return "Accepted"; + case 203: + return "Non-Authoritative Information"; + case 204: + return "No Content"; + case 205: + return "Reset Content"; + case 206: + return "Partial Content"; + case 207: + return "Multi-Status"; + case 226: + return "IM Used"; + case 300: + return "Multiple Choices"; + case 301: + return "Moved Permanently"; + case 302: + return "Found"; + case 303: + return "See Other"; + case 304: + return "Not Modified"; + case 305: + return "Use Proxy"; + case 306: + return "Reserved"; + case 307: + return "Temporary Redirect"; + case 400: + return "Bad Request"; + case 401: + return "Unauthorized"; + case 402: + return "Payment Required"; + case 403: + return "Forbidden"; + case 404: + return "Not Found"; + case 405: + return "Method Not Allowed"; + case 406: + return "Not Acceptable"; + case 407: + return "Proxy Authentication Required"; + case 408: + return "Request Timeout"; + case 409: + return "Conflict"; + case 410: + return "Gone"; + case 411: + return "Length Required"; + case 412: + return "Precondition Failed"; + case 413: + return "Request Entity Too Large"; + case 414: + return "Request-URI Too Long"; + case 415: + return "Unsupported Media Type"; + case 416: + return "Requested Range Not Satisfiable"; + case 417: + return "Expectation Failed"; + case 418: + return "I'm a Teapot"; + case 422: + return "Unprocessable Entity"; + case 423: + return "Locked"; + case 424: + return "Failed Dependency"; + case 426: + return "Upgrade Required"; + case 500: + return "Internal Server Error"; + case 501: + return "Not Implemented"; + case 502: + return "Bad Gateway"; + case 503: + return "Service Unavailable"; + case 504: + return "Gateway Timeout"; + case 505: + return "HTTP Version Not Supported"; + case 506: + return "Variant Also Negotiates"; + case 507: + return "Insufficient Storage"; + case 510: + return "Not Extended"; + default: + return null; + } + } + } + +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs new file mode 100644 index 0000000000..5016a9881f --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Runtime.InteropServices; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + public class SocketInput + { + private readonly IMemoryPool _memory; + private GCHandle _gcHandle; + + public SocketInput(IMemoryPool memory) + { + _memory = memory; + Buffer = new ArraySegment(_memory.Empty, 0, 0); + } + + public ArraySegment Buffer { get; set; } + + public bool RemoteIntakeFin { get; set; } + + + public void Skip(int count) + { + Buffer = new ArraySegment(Buffer.Array, Buffer.Offset + count, Buffer.Count - count); + } + + public ArraySegment Take(int count) + { + var taken = new ArraySegment(Buffer.Array, Buffer.Offset, count); + Skip(count); + return taken; + } + + public void Free() + { + if (Buffer.Count == 0 && Buffer.Array.Length != 0) + { + _memory.FreeByte(Buffer.Array); + Buffer = new ArraySegment(_memory.Empty, 0, 0); + } + } + + public ArraySegment Available(int minimumSize) + { + if (Buffer.Count == 0 && Buffer.Offset != 0) + { + Buffer = new ArraySegment(Buffer.Array, 0, 0); + } + + var availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count; + + if (availableSize < minimumSize) + { + if (availableSize + Buffer.Offset >= minimumSize) + { + Array.Copy(Buffer.Array, Buffer.Offset, Buffer.Array, 0, Buffer.Count); + if (Buffer.Count != 0) + { + Buffer = new ArraySegment(Buffer.Array, 0, Buffer.Count); + } + availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count; + } + else + { + var largerSize = Buffer.Array.Length + Math.Max(Buffer.Array.Length, minimumSize); + var larger = new ArraySegment(_memory.AllocByte(largerSize), 0, Buffer.Count); + if (Buffer.Count != 0) + { + Array.Copy(Buffer.Array, Buffer.Offset, larger.Array, 0, Buffer.Count); + } + _memory.FreeByte(Buffer.Array); + Buffer = larger; + availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count; + } + } + return new ArraySegment(Buffer.Array, Buffer.Offset + Buffer.Count, availableSize); + } + + public void Extend(int count) + { + Debug.Assert(count >= 0); + Debug.Assert(Buffer.Offset >= 0); + Debug.Assert(Buffer.Offset <= Buffer.Array.Length); + Debug.Assert(Buffer.Offset + Buffer.Count <= Buffer.Array.Length); + Debug.Assert(Buffer.Offset + Buffer.Count + count <= Buffer.Array.Length); + + Buffer = new ArraySegment(Buffer.Array, Buffer.Offset, Buffer.Count + count); + } + public IntPtr Pin(int minimumSize) + { + var segment = Available(minimumSize); + _gcHandle = GCHandle.Alloc(segment.Array, GCHandleType.Pinned); + return _gcHandle.AddrOfPinnedObject() + segment.Offset; + } + public void Unpin(int count) + { + _gcHandle.Free(); + Extend(count); + } + + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs new file mode 100644 index 0000000000..d33719c57d --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.Server.Kestrel.Networking; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net.Sockets; +using System.Runtime.InteropServices; + +namespace Microsoft.AspNet.Server.Kestrel.Http +{ + /// + /// Operations performed for buffered socket output + /// + public interface ISocketOutput + { + void Write(ArraySegment buffer, Action callback, object state); + } + + public class SocketOutput : ISocketOutput + { + private readonly KestrelThread _thread; + private readonly UvStreamHandle _socket; + + public SocketOutput(KestrelThread thread, UvStreamHandle socket) + { + _thread = thread; + _socket = socket; + } + + public void Write(ArraySegment buffer, Action callback, object state) + { + var req = new ThisWriteReq(); + req.Init(_thread.Loop); + req.Contextualize(this, _socket, buffer, callback, state); + _thread.Post(x => + { + ((ThisWriteReq)x).Write(); + }, req); + } + + public class ThisWriteReq : UvWriteReq + { + private static readonly Action _writeCallback = WriteCallback; + private static void WriteCallback(UvWriteReq req, int status, object state) + { + ((ThisWriteReq)state).OnWrite(req, status); + } + + SocketOutput _self; + ArraySegment _buffer; + Action _drained; + UvStreamHandle _socket; + Action _callback; + object _state; + GCHandle _pin; + + internal void Contextualize( + SocketOutput socketOutput, + UvStreamHandle socket, + ArraySegment buffer, + Action callback, + object state) + { + _self = socketOutput; + _socket = socket; + _buffer = buffer; + _callback = callback; + _state = state; + } + + public void Write() + { + _pin = GCHandle.Alloc(_buffer.Array, GCHandleType.Pinned); + var buf = new Libuv.uv_buf_t + { + len = (uint)_buffer.Count, + memory = _pin.AddrOfPinnedObject() + _buffer.Offset + }; + + Write( + _socket, + new[] { buf }, + 1, + _writeCallback, + this); + } + + private void OnWrite(UvWriteReq req, int status) + { + _pin.Free(); + //NOTE: pool this? + Close(); + _callback(_state); + } + } + + + public bool Flush(Action drained) + { + return false; + } + + } +} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs index 9bb2a7f4bd..8e61adb824 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/Disposable.cs @@ -1,4 +1,7 @@ -using System; +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; namespace Microsoft.AspNet.Server.Kestrel { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 83dd84a4a5..43d09a593c 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -1,4 +1,7 @@ -using Microsoft.AspNet.Server.Kestrel.Networking; +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.Server.Kestrel.Networking; using System; using System.Collections.Generic; using System.Threading; diff --git a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs index b404d6eb63..add92c5f37 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs @@ -1,9 +1,12 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; using Microsoft.AspNet.Server.Kestrel.Networking; using System.Threading; using System.Collections.Generic; using System.Threading.Tasks; +using Microsoft.AspNet.Server.Kestrel.Http; namespace Microsoft.AspNet.Server.Kestrel { @@ -14,11 +17,13 @@ namespace Microsoft.AspNet.Server.Kestrel { Threads = new List(); Listeners = new List(); + Memory = new MemoryPool(); Libuv = new Libuv(); Libuv.Load("libuv.dll"); } public Libuv Libuv { get; private set; } + public IMemoryPool Memory { get; set; } public List Threads { get; private set; } public List Listeners { get; private set; } @@ -44,13 +49,13 @@ namespace Microsoft.AspNet.Server.Kestrel Threads.Clear(); } - public IDisposable CreateServer() + public IDisposable CreateServer(Func app) { var listeners = new List(); foreach (var thread in Threads) { - var listener = new Listener(thread); - listener.StartAsync().Wait(); + var listener = new Listener(Memory); + listener.StartAsync(thread, app).Wait(); listeners.Add(listener); } return new Disposable(() => diff --git a/src/Microsoft.AspNet.Server.Kestrel/Microsoft.AspNet.Server.Kestrel.kproj b/src/Microsoft.AspNet.Server.Kestrel/Microsoft.AspNet.Server.Kestrel.kproj new file mode 100644 index 0000000000..ca3a199436 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Microsoft.AspNet.Server.Kestrel.kproj @@ -0,0 +1,57 @@ + + + + 12.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + + f510611a-3bee-4b88-a613-5f4a74ed82a1 + Library + + + ConsoleDebugger + + + WebDebugger + + + + + + + 2.0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs index 17a2299a44..a9e5dd7676 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs @@ -177,6 +177,50 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking Check(_uv_read_stop(handle)); } + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate int uv_try_write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs); + uv_try_write _uv_try_write; + public int try_write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs) + { + return Check(_uv_try_write(handle, bufs, nbufs)); + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + public delegate void uv_write_cb(IntPtr req, int status); + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate int uv_write(UvWriteReq req, UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs, uv_write_cb cb); + uv_write _uv_write; + public void write(UvWriteReq req, UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs, uv_write_cb cb) + { + Check(_uv_write(req, handle, bufs, nbufs, cb)); + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + public delegate void uv_shutdown_cb(IntPtr req, int status); + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate int uv_shutdown(UvShutdownReq req, UvStreamHandle handle, uv_shutdown_cb cb); + uv_shutdown _uv_shutdown; + public void shutdown(UvShutdownReq req, UvStreamHandle handle, uv_shutdown_cb cb) + { + Check(_uv_shutdown(req, handle, cb)); + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate int uv_handle_size(int handleType); + uv_handle_size _uv_handle_size; + public int handle_size(int handleType) + { + return _uv_handle_size(handleType); + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + delegate int uv_req_size(int handleType); + uv_req_size _uv_req_size; + public int req_size(int handleType) + { + return _uv_req_size(handleType); + } + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] delegate int uv_ip4_addr(string ip, int port, out sockaddr addr); @@ -209,5 +253,9 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking public IntPtr memory; } + //int handle_size_async; + //int handle_size_tcp; + //int req_size_write; + //int req_size_shutdown; } } \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs index 033c649484..e76dac9176 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs @@ -2,38 +2,15 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Runtime.InteropServices; namespace Microsoft.AspNet.Server.Kestrel.Networking { - public abstract class UvHandle : SafeHandle + public abstract class UvHandle : UvMemory { - protected Libuv _uv; static Libuv.uv_close_cb _close_cb = DestroyHandle; - public UvHandle() : base(IntPtr.Zero, true) - { - } - public override bool IsInvalid - { - get - { - return handle == IntPtr.Zero; - } - } - unsafe protected void CreateHandle(Libuv uv, int size) - { - _uv = uv; - handle = Marshal.AllocCoTaskMem(size); - *(IntPtr*)handle = GCHandle.ToIntPtr(GCHandle.Alloc(this)); - } - - protected void CreateHandle(UvLoopHandle loop, int size) - { - CreateHandle(loop._uv, size); - } protected override bool ReleaseHandle() { var memory = handle; @@ -45,22 +22,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking return true; } - unsafe protected static void DestroyHandle(IntPtr memory) - { - var gcHandlePtr = *(IntPtr*)memory; - if (gcHandlePtr != IntPtr.Zero) - { - GCHandle.FromIntPtr(gcHandlePtr).Free(); - } - Marshal.FreeCoTaskMem(memory); - } - - unsafe public static THandle FromIntPtr(IntPtr handle) - { - GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle); - return (THandle)gcHandle.Target; - } - public void Reference() { _uv.@ref(this); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvLoopHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvLoopHandle.cs index 73bab4b5f7..ec87e7db56 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvLoopHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvLoopHandle.cs @@ -13,7 +13,6 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking _uv.loop_init(this); } - public int Run(int mode = 0) { return _uv.run(this, mode); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs new file mode 100644 index 0000000000..145b286fe1 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Runtime.InteropServices; + +namespace Microsoft.AspNet.Server.Kestrel.Networking +{ + /// + /// Summary description for UvMemory + /// + public abstract class UvMemory : SafeHandle + { + protected Libuv _uv; + public UvMemory() : base(IntPtr.Zero, true) + { + } + + public Libuv Libuv { get { return _uv; } } + + public override bool IsInvalid + { + get + { + return handle == IntPtr.Zero; + } + } + + unsafe protected void CreateHandle(Libuv uv, int size) + { + _uv = uv; + handle = Marshal.AllocCoTaskMem(size); + *(IntPtr*)handle = GCHandle.ToIntPtr(GCHandle.Alloc(this)); + } + + protected void CreateHandle(UvLoopHandle loop, int size) + { + CreateHandle(loop._uv, size); + } + + unsafe protected static void DestroyHandle(IntPtr memory) + { + var gcHandlePtr = *(IntPtr*)memory; + if (gcHandlePtr != IntPtr.Zero) + { + GCHandle.FromIntPtr(gcHandlePtr).Free(); + } + Marshal.FreeCoTaskMem(memory); + } + + unsafe public static THandle FromIntPtr(IntPtr handle) + { + GCHandle gcHandle = GCHandle.FromIntPtr(*(IntPtr*)handle); + return (THandle)gcHandle.Target; + } + + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs new file mode 100644 index 0000000000..2bc23468e2 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; + +namespace Microsoft.AspNet.Server.Kestrel.Networking +{ + /// + /// Summary description for UvShutdownRequest + /// + public class UvShutdownReq : UvReq + { + private readonly static Libuv.uv_shutdown_cb _uv_shutdown_cb = UvShutdownCb; + + Action _callback; + object _state; + + public void Init(UvLoopHandle loop) + { + CreateHandle(loop.Libuv, loop.Libuv.req_size(3)); + } + + public void Shutdown(UvStreamHandle handle, Action callback, object state) + { + _callback = callback; + _state = state; + _uv.shutdown(this, handle, _uv_shutdown_cb); + } + + private static void UvShutdownCb(IntPtr ptr, int status) + { + var req = FromIntPtr(ptr); + req._callback(req, status, req._state); + req._callback = null; + req._state = null; + } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs index e4d33aa849..7f2ff901e7 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvStreamHandle.cs @@ -15,12 +15,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking public Action _connectionCallback; public object _connectionState; - public Action _readCallback; + public Func _allocCallback; + + public Action _readCallback; public object _readState; - public UvStreamHandle() - { - } public void Listen(int backlog, Action callback, object state) { @@ -35,10 +34,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking } public void ReadStart( - Action callback, + Func allocCallback, + Action readCallback, object state) { - _readCallback = callback; + _allocCallback = allocCallback; + _readCallback = readCallback; _readState = state; _uv.read_start(this, _uv_alloc_cb, _uv_read_cb); } @@ -48,38 +49,38 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking _uv.read_stop(this); } + public int TryWrite(Libuv.uv_buf_t buf) + { + return _uv.try_write(this, new[] { buf }, 1); + } + + private static void UvConnectionCb(IntPtr handle, int status) { var stream = FromIntPtr(handle); stream._connectionCallback(stream, status, stream._connectionState); } - private static void UvAllocCb(IntPtr server, int suggested_size, out Libuv.uv_buf_t buf) + private static void UvAllocCb(IntPtr handle, int suggested_size, out Libuv.uv_buf_t buf) { - buf = new Libuv.uv_buf_t - { - memory = Marshal.AllocCoTaskMem(suggested_size), - len = (uint)suggested_size, - }; - + var stream = FromIntPtr(handle); + buf = stream._allocCallback(stream, suggested_size, stream._readState); } - private static void UvReadCb(IntPtr ptr, int nread, ref Libuv.uv_buf_t buf) + private static void UvReadCb(IntPtr handle, int nread, ref Libuv.uv_buf_t buf) { - var stream = FromIntPtr(ptr); + var stream = FromIntPtr(handle); + if (nread == -4095) { - stream._readCallback(stream, 0, null, stream._readState); - Marshal.FreeCoTaskMem(buf.memory); + stream._readCallback(stream, 0, stream._readState); return; } var length = stream._uv.Check(nread); - var data = new byte[length]; - Marshal.Copy(buf.memory, data, 0, length); - Marshal.FreeCoTaskMem(buf.memory); - stream._readCallback(stream, length, data, stream._readState); + stream._readCallback(stream, nread, stream._readState); } + } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteRequest.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteRequest.cs new file mode 100644 index 0000000000..17381696c5 --- /dev/null +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteRequest.cs @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Runtime.InteropServices; + +namespace Microsoft.AspNet.Server.Kestrel.Networking +{ + /// + /// Summary description for UvWriteRequest + /// + public class UvWriteReq : UvReq + { + private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + + Action _callback; + object _state; + + public void Init(UvLoopHandle loop) + { + CreateHandle(loop.Libuv, loop.Libuv.req_size(2)); + } + + public void Write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs, Action callback, object state) + { + _callback = callback; + _state = state; + _uv.write(this, handle, bufs, nbufs, _uv_write_cb); + } + + private static void UvWriteCb(IntPtr ptr, int status) + { + var req = FromIntPtr(ptr); + req._callback(req, status, req._state); + req._callback = null; + req._state = null; + } + } + + public abstract class UvReq : UvMemory + { + + unsafe protected void CreateHandle(Libuv uv, int size) + { + _uv = uv; + handle = Marshal.AllocCoTaskMem(size); + *(IntPtr*)handle = GCHandle.ToIntPtr(GCHandle.Alloc(this)); + } + + protected override bool ReleaseHandle() + { + DestroyHandle(handle); + handle = IntPtr.Zero; + return true; + } + } +} \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/Project.json b/src/Microsoft.AspNet.Server.Kestrel/Project.json index 0802ababfa..cb7ece47c7 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Project.json +++ b/src/Microsoft.AspNet.Server.Kestrel/Project.json @@ -1,6 +1,7 @@ { "version": "0.1-alpha-*", "dependencies": { + "Microsoft.AspNet.Hosting": "0.1-*" }, "compilationOptions": { "allowUnsafe": true diff --git a/src/SampleApp/Program.cs b/src/SampleApp/Program.cs new file mode 100644 index 0000000000..446628b570 --- /dev/null +++ b/src/SampleApp/Program.cs @@ -0,0 +1,36 @@ +using System; +using System.Threading.Tasks; + +namespace SampleApp +{ + public class Program + { + public static void Main(string[] args) + { + var engine = new Microsoft.AspNet.Server.Kestrel.KestrelEngine(); + engine.Start(1); + using (var server = engine.CreateServer(App)) + { + Console.WriteLine("Hello World"); + Console.ReadLine(); + } + engine.Stop(); + } + + private static async Task App(object arg) + { + var httpContext = new Microsoft.AspNet.PipelineCore.DefaultHttpContext( + new Microsoft.AspNet.FeatureModel.FeatureCollection( + new Microsoft.AspNet.FeatureModel.FeatureObject(arg))); + + Console.WriteLine("{0} {1}{2}{3}", + httpContext.Request.Method, + httpContext.Request.PathBase, + httpContext.Request.Path, + httpContext.Request.QueryString); + + httpContext.Response.ContentType = "text/plain"; + await httpContext.Response.WriteAsync("Hello world"); + } + } +} diff --git a/src/SampleApp/SampleApp.kproj b/src/SampleApp/SampleApp.kproj new file mode 100644 index 0000000000..7452dd8c38 --- /dev/null +++ b/src/SampleApp/SampleApp.kproj @@ -0,0 +1,35 @@ + + + + 12.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + Debug + AnyCPU + + + + 2c3cb3dc-eebf-4f52-9e1c-4f2f972e76c3 + Console + + + ConsoleDebugger + + + WebDebugger + + + + + 2.0 + + + + + + + + + + \ No newline at end of file diff --git a/src/SampleApp/libuv.dll b/src/SampleApp/libuv.dll new file mode 100644 index 0000000000..d3a5834718 Binary files /dev/null and b/src/SampleApp/libuv.dll differ diff --git a/src/SampleApp/project.json b/src/SampleApp/project.json new file mode 100644 index 0000000000..c1110915de --- /dev/null +++ b/src/SampleApp/project.json @@ -0,0 +1,14 @@ +{ + "dependencies": { + "Microsoft.AspNet.Server.Kestrel": "0.1-alpha-*", + "Microsoft.AspNet.PipelineCore": "0.1-alpha-*" + }, + "configurations" : { + "net45" : { }, + "k10" : { + "dependencies": { + "System.Console": "4.0.0.0" + } + } + } +} diff --git a/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs b/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs index 3287054606..4c5b7e6887 100644 --- a/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs +++ b/test/Microsoft.AspNet.Server.KestralTests/EngineTests.cs @@ -1,5 +1,9 @@ -using Microsoft.AspNet.Server.Kestrel; +using Microsoft.AspNet.HttpFeature; +using Microsoft.AspNet.Server.Kestrel; using System; +using System.Net; +using System.Net.Sockets; +using System.Text; using System.Threading.Tasks; using Xunit; @@ -10,21 +14,61 @@ namespace Microsoft.AspNet.Server.KestralTests /// public class EngineTests { - [Fact] + private async Task App(object callContext) + { + var request = callContext as IHttpRequestFeature; + var response = callContext as IHttpResponseFeature; + for (; ;) + { + var buffer = new byte[8192]; + var count = await request.Body.ReadAsync(buffer, 0, buffer.Length); + if (count == 0) + { + break; + } + await response.Body.WriteAsync(buffer, 0, count); + } + } + + [Fact] public async Task EngineCanStartAndStop() { var engine = new KestrelEngine(); engine.Start(1); engine.Stop(); } + [Fact] public async Task ListenerCanCreateAndDispose() { var engine = new KestrelEngine(); engine.Start(1); - var started = engine.CreateServer(); + var started = engine.CreateServer(App); + started.Dispose(); + engine.Stop(); + } + + + [Fact] + public async Task ConnectionCanReadAndWrite() + { + var engine = new KestrelEngine(); + engine.Start(1); + var started = engine.CreateServer(App); + + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Connect(new IPEndPoint(IPAddress.Loopback, 4001)); + socket.Send(Encoding.ASCII.GetBytes("POST / HTTP/1.0\r\n\r\nHello World")); + socket.Shutdown(SocketShutdown.Send); + var buffer = new byte[8192]; + for (; ;) + { + var length = socket.Receive(buffer); + if (length == 0) { break; } + var text = Encoding.ASCII.GetString(buffer, 0, length); + } started.Dispose(); engine.Stop(); } } -} \ No newline at end of file +} diff --git a/test/Microsoft.AspNet.Server.KestralTests/MessageBodyExchangerTests.cs b/test/Microsoft.AspNet.Server.KestralTests/MessageBodyExchangerTests.cs new file mode 100644 index 0000000000..db8ff8218f --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestralTests/MessageBodyExchangerTests.cs @@ -0,0 +1,190 @@ +using Microsoft.AspNet.Server.Kestrel; +using Microsoft.AspNet.Server.Kestrel.Http; +using System; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.AspNet.Server.KestralTests +{ + /// + /// Summary description for MessageBodyExchangerTests + /// + public class MessageBodyExchangerTests + { + [Fact] + public async Task CallingReadAsyncBeforeTransfer() + { + var testInput = new TestInput(); + var context = new ConnectionContext(); + context.SocketInput = new SocketInput(new MemoryPool()); + + var exchanger = new MessageBodyExchanger(testInput.ConnectionContext); + + var buffer1 = new byte[1024]; + var buffer2 = new byte[1024]; + var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); + var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); + + Assert.False(task1.IsCompleted); + Assert.False(task2.IsCompleted); + + testInput.Add("Hello"); + + exchanger.Transfer(3, false); + + var count1 = await task1; + + Assert.True(task1.IsCompleted); + Assert.False(task2.IsCompleted); + AssertASCII("Hel", new ArraySegment(buffer1, 0, count1)); + + exchanger.Transfer(2, false); + + var count2 = await task2; + + Assert.True(task1.IsCompleted); + Assert.True(task2.IsCompleted); + AssertASCII("lo", new ArraySegment(buffer2, 0, count2)); + } + + [Fact] + public async Task CallingTransferBeforeReadAsync() + { + var testInput = new TestInput(); + var context = new ConnectionContext(); + context.SocketInput = new SocketInput(new MemoryPool()); + + var exchanger = new MessageBodyExchanger(testInput.ConnectionContext); + + testInput.Add("Hello"); + + exchanger.Transfer(5, false); + + var buffer1 = new byte[1024]; + var buffer2 = new byte[1024]; + var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); + var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); + + Assert.True(task1.IsCompleted); + Assert.False(task2.IsCompleted); + + await task1; + + var count1 = await task1; + + Assert.True(task1.IsCompleted); + Assert.False(task2.IsCompleted); + AssertASCII("Hello", new ArraySegment(buffer1, 0, count1)); + } + + [Fact] + public async Task TransferZeroBytesDoesNotReleaseReadAsync() + { + var testInput = new TestInput(); + var context = new ConnectionContext(); + context.SocketInput = new SocketInput(new MemoryPool()); + + var exchanger = new MessageBodyExchanger(testInput.ConnectionContext); + + var buffer1 = new byte[1024]; + var buffer2 = new byte[1024]; + var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); + var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); + + Assert.False(task1.IsCompleted); + Assert.False(task2.IsCompleted); + + testInput.Add("Hello"); + + exchanger.Transfer(3, false); + + var count1 = await task1; + + Assert.True(task1.IsCompleted); + Assert.False(task2.IsCompleted); + AssertASCII("Hel", new ArraySegment(buffer1, 0, count1)); + + exchanger.Transfer(0, false); + + Assert.True(task1.IsCompleted); + Assert.False(task2.IsCompleted); + } + + [Fact] + public async Task TransferFinDoesReleaseReadAsync() + { + var testInput = new TestInput(); + var context = new ConnectionContext(); + context.SocketInput = new SocketInput(new MemoryPool()); + + var exchanger = new MessageBodyExchanger(testInput.ConnectionContext); + + var buffer1 = new byte[1024]; + var buffer2 = new byte[1024]; + var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); + var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); + + Assert.False(task1.IsCompleted); + Assert.False(task2.IsCompleted); + + testInput.Add("Hello"); + + exchanger.Transfer(3, false); + + var count1 = await task1; + + Assert.True(task1.IsCompleted); + Assert.False(task2.IsCompleted); + AssertASCII("Hel", new ArraySegment(buffer1, 0, count1)); + + exchanger.Transfer(0, true); + + var count2 = await task2; + + Assert.True(task1.IsCompleted); + Assert.True(task2.IsCompleted); + Assert.Equal(0, count2); + } + + + [Fact] + public async Task TransferFinFirstDoesReturnsCompletedReadAsyncs() + { + + var testInput = new TestInput(); + var context = new ConnectionContext(); + context.SocketInput = new SocketInput(new MemoryPool()); + + var exchanger = new MessageBodyExchanger(testInput.ConnectionContext); + + testInput.Add("Hello"); + + exchanger.Transfer(5, true); + + var buffer1 = new byte[1024]; + var buffer2 = new byte[1024]; + var task1 = exchanger.ReadAsync(new ArraySegment(buffer1)); + var task2 = exchanger.ReadAsync(new ArraySegment(buffer2)); + + Assert.True(task1.IsCompleted); + Assert.True(task2.IsCompleted); + + var count1 = await task1; + var count2 = await task2; + + AssertASCII("Hello", new ArraySegment(buffer1, 0, count1)); + Assert.Equal(0, count2); + } + + private void AssertASCII(string expected, ArraySegment actual) + { + var encoding = System.Text.Encoding.ASCII; + var bytes = encoding.GetBytes(expected); + Assert.Equal(bytes.Length, actual.Count); + for (var index = 0; index != bytes.Length; ++index) + { + Assert.Equal(bytes[index], actual.Array[actual.Offset + index]); + } + } + } +} \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestralTests/MessageBodyTests.cs b/test/Microsoft.AspNet.Server.KestralTests/MessageBodyTests.cs new file mode 100644 index 0000000000..db1e3e3f89 --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestralTests/MessageBodyTests.cs @@ -0,0 +1,64 @@ +using Microsoft.AspNet.Server.Kestrel.Http; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.AspNet.Server.KestralTests +{ + /// + /// Summary description for MessageBodyTests + /// + public class MessageBodyTests + { + [Fact] + public async Task Http10ConnectionClose() + { + var input = new TestInput(); + var body = MessageBody.For("HTTP/1.0", new Dictionary(), input.ConnectionContext); + var stream = new FrameRequestStream(body); + + input.Add("Hello", true); + body.Consume(); + + var buffer1 = new byte[1024]; + var count1 = stream.Read(buffer1, 0, 1024); + AssertASCII("Hello", new ArraySegment(buffer1, 0, 5)); + + var buffer2 = new byte[1024]; + var count2 = stream.Read(buffer2, 0, 1024); + Assert.Equal(0, count2); + } + + [Fact] + public async Task Http10ConnectionCloseAsync() + { + var input = new TestInput(); + var body = MessageBody.For("HTTP/1.0", new Dictionary(), input.ConnectionContext); + var stream = new FrameRequestStream(body); + + input.Add("Hello", true); + body.Consume(); + + var buffer1 = new byte[1024]; + var count1 = await stream.ReadAsync(buffer1, 0, 1024); + AssertASCII("Hello", new ArraySegment(buffer1, 0, 5)); + + var buffer2 = new byte[1024]; + var count2 = await stream.ReadAsync(buffer2, 0, 1024); + Assert.Equal(0, count2); + } + + private void AssertASCII(string expected, ArraySegment actual) + { + var encoding = Encoding.ASCII; + var bytes = encoding.GetBytes(expected); + Assert.Equal(bytes.Length, actual.Count); + for (var index = 0; index != bytes.Length; ++index) + { + Assert.Equal(bytes[index], actual.Array[actual.Offset + index]); + } + } + } +} \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestralTests/Microsoft.AspNet.Server.KestralTests.kproj b/test/Microsoft.AspNet.Server.KestralTests/Microsoft.AspNet.Server.KestralTests.kproj new file mode 100644 index 0000000000..1458edb016 --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestralTests/Microsoft.AspNet.Server.KestralTests.kproj @@ -0,0 +1,37 @@ + + + + 12.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + + 37f3bfb2-6454-49e5-9d7f-581bf755ccfe + Console + + + ConsoleDebugger + + + WebDebugger + + + + + + + 2.0 + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs b/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs index 2f17803fd4..4ccd07526c 100644 --- a/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs +++ b/test/Microsoft.AspNet.Server.KestralTests/NetworkingTests.cs @@ -5,6 +5,7 @@ using Microsoft.AspNet.Server.Kestrel.Networking; using System; using System.Net; using System.Net.Sockets; +using System.Runtime.InteropServices; using System.Threading.Tasks; using Xunit; @@ -113,14 +114,18 @@ namespace Microsoft.AspNet.Server.KestralTests var tcp2 = new UvTcpHandle(); tcp2.Init(loop); tcp.Accept(tcp2); - tcp2.ReadStart((__, nread, data, state2) => - { - bytesRead += nread; - if (nread == 0) + var data = Marshal.AllocCoTaskMem(500); + tcp2.ReadStart( + (a, b, c) => new Libuv.uv_buf_t { memory = data, len = 500 }, + (__, nread, state2) => { - tcp2.Close(); - } - }, null); + bytesRead += nread; + if (nread == 0) + { + tcp2.Close(); + } + }, + null); tcp.Close(); }, null); var t = Task.Run(async () => diff --git a/test/Microsoft.AspNet.Server.KestralTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestralTests/TestInput.cs new file mode 100644 index 0000000000..4e920c8192 --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestralTests/TestInput.cs @@ -0,0 +1,34 @@ +using System; +using Microsoft.AspNet.Server.Kestrel.Http; + +namespace Microsoft.AspNet.Server.KestralTests +{ + class TestInput + { + public TestInput() + { + var memory = new MemoryPool(); + ConnectionContext = new ConnectionContext + { + SocketInput = new SocketInput(memory), + Memory = memory, + }; + + } + public ConnectionContext ConnectionContext { get; set; } + + public void Add(string text, bool fin = false) + { + var encoding = System.Text.Encoding.ASCII; + var count = encoding.GetByteCount(text); + var buffer = ConnectionContext.SocketInput.Available(text.Length); + count = encoding.GetBytes(text, 0, text.Length, buffer.Array, buffer.Offset); + ConnectionContext.SocketInput.Extend(count); + if (fin) + { + ConnectionContext.SocketInput.RemoteIntakeFin = true; + } + } + } +} +