From 27ddb7de900f4aa258cd3a6d834d0436661ecf05 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Fri, 30 Sep 2016 02:41:07 -0700 Subject: [PATCH] More cleanup --- src/WebApplication95/Bus.cs | 16 +-- src/WebApplication95/Dispatcher.cs | 98 ++++++++----------- src/WebApplication95/EndPoint.cs | 21 ++++ .../EndPoints/ChatEndPoint.cs | 51 ++++++++++ src/WebApplication95/IDispatcher.cs | 9 -- src/WebApplication95/LongPolling.cs | 4 +- src/WebApplication95/Routing/PrefixRoute.cs | 61 ++++++++++++ .../Routing/RouteBuilderExtensions.cs | 21 ++++ src/WebApplication95/Startup.cs | 12 ++- src/WebApplication95/project.json | 1 + src/WebApplication95/wwwroot/index.html | 4 +- src/WebApplication95/wwwroot/polling.html | 4 +- 12 files changed, 217 insertions(+), 85 deletions(-) create mode 100644 src/WebApplication95/EndPoint.cs create mode 100644 src/WebApplication95/EndPoints/ChatEndPoint.cs delete mode 100644 src/WebApplication95/IDispatcher.cs create mode 100644 src/WebApplication95/Routing/PrefixRoute.cs create mode 100644 src/WebApplication95/Routing/RouteBuilderExtensions.cs diff --git a/src/WebApplication95/Bus.cs b/src/WebApplication95/Bus.cs index 8a03b6f5f2..b7be6e8818 100644 --- a/src/WebApplication95/Bus.cs +++ b/src/WebApplication95/Bus.cs @@ -4,22 +4,22 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Channels; namespace WebApplication95 { public class Message { - public string ContentType { get; set; } - public ArraySegment Payload { get; set; } + public ReadableBuffer Payload { get; set; } } public class Bus { - private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); + private readonly ConcurrentDictionary>> _subscriptions = new ConcurrentDictionary>>(); - public IDisposable Subscribe(string key, IObserver observer) + public IDisposable Subscribe(string key, Func observer) { - var connections = _subscriptions.GetOrAdd(key, _ => new List>()); + var connections = _subscriptions.GetOrAdd(key, _ => new List>()); connections.Add(observer); return new DisposableAction(() => @@ -28,14 +28,14 @@ namespace WebApplication95 }); } - public void Publish(string key, Message message) + public async Task Publish(string key, Message message) { - List> connections; + List> connections; if (_subscriptions.TryGetValue(key, out connections)) { foreach (var c in connections) { - c.OnNext(message); + await c(message); } } } diff --git a/src/WebApplication95/Dispatcher.cs b/src/WebApplication95/Dispatcher.cs index 19dd8e4d9a..be14340db2 100644 --- a/src/WebApplication95/Dispatcher.cs +++ b/src/WebApplication95/Dispatcher.cs @@ -1,22 +1,47 @@ using System; using System.Collections.Generic; -using System.IO; -using System.Linq; using System.Threading.Tasks; using Channels; +using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Primitives; +using WebApplication95.Routing; namespace WebApplication95 { + public static class DispatcherExtensions + { + public static IApplicationBuilder UseRealTimeConnections(this IApplicationBuilder app, Action callback) + { + var dispatcher = new Dispatcher(app); + callback(dispatcher); + app.UseRouter(dispatcher.GetRouter()); + return app; + } + } + public class Dispatcher { private readonly ConnectionManager _manager = new ConnectionManager(); - private readonly EndPoint _endpoint = new EndPoint(); + private readonly RouteBuilder _routes; - public async Task Execute(HttpContext context) + public Dispatcher(IApplicationBuilder app) { - if (context.Request.Path.StartsWithSegments("/send")) + _routes = new RouteBuilder(app); + } + + public void MapEndPoint(string path) where TEndPoint : EndPoint + { + _routes.AddPrefixRoute(path, new RouteHandler(c => Execute(path, c))); + } + + public IRouter GetRouter() => _routes.Build(); + + public async Task Execute(string path, HttpContext context) where TEndPoint : EndPoint + { + if (context.Request.Path.StartsWithSegments(path + "/send")) { var connectionId = context.Request.Query["id"]; @@ -34,17 +59,19 @@ namespace WebApplication95 } else { + var endpoint = (EndPoint)context.RequestServices.GetRequiredService(); + var connectionId = _manager.GetConnectionId(context); // Outgoing channels - if (context.Request.Path.StartsWithSegments("/sse")) + if (context.Request.Path.StartsWithSegments(path + "/sse")) { ConnectionState state; _manager.AddConnection(connectionId, out state); var sse = new ServerSentEvents(state); - var ignore = _endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(state.Connection); state.Connection.TransportType = TransportType.ServerSentEvents; @@ -54,14 +81,14 @@ namespace WebApplication95 _manager.RemoveConnection(connectionId); } - else if (context.Request.Path.StartsWithSegments("/ws")) + else if (context.Request.Path.StartsWithSegments(path + "/ws")) { ConnectionState state; _manager.AddConnection(connectionId, out state); var ws = new WebSockets(state); - var ignore = _endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(state.Connection); state.Connection.TransportType = TransportType.WebSockets; @@ -71,14 +98,15 @@ namespace WebApplication95 _manager.RemoveConnection(connectionId); } - else if (context.Request.Path.StartsWithSegments("/poll")) + else if (context.Request.Path.StartsWithSegments(path + "/poll")) { ConnectionState state; bool newConnection = false; if (_manager.AddConnection(connectionId, out state)) { newConnection = true; - var ignore = _endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(state.Connection); + state.Connection.TransportType = TransportType.LongPolling; } @@ -92,52 +120,4 @@ namespace WebApplication95 } } } - - public class EndPoint - { - private List _connections = new List(); - - public virtual async Task OnConnected(Connection connection) - { - lock (_connections) - { - _connections.Add(connection); - } - - // Echo server - while (true) - { - var input = await connection.Input.ReadAsync(); - try - { - if (input.IsEmpty && connection.Input.Reading.IsCompleted) - { - break; - } - - List connections = null; - lock (_connections) - { - connections = _connections; - } - - foreach (var c in connections) - { - var output = c.Output.Alloc(); - output.Append(ref input); - await output.FlushAsync(); - } - } - finally - { - connection.Input.Advance(input.End); - } - } - - lock (_connections) - { - _connections.Remove(connection); - } - } - } } diff --git a/src/WebApplication95/EndPoint.cs b/src/WebApplication95/EndPoint.cs new file mode 100644 index 0000000000..188d166160 --- /dev/null +++ b/src/WebApplication95/EndPoint.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Channels; + +namespace WebApplication95 +{ + /// + /// Represents an end point that multiple connections connect to. For HTTP, endpoints are URLs, for non HTTP it can be a TCP listener (or similar) + /// + public class EndPoint + { + // This is a stream based API, we might just want to change to a message based API or invent framing + // over this stream based API to do a message based API + public virtual Task OnConnected(Connection connection) + { + return Task.CompletedTask; + } + } +} diff --git a/src/WebApplication95/EndPoints/ChatEndPoint.cs b/src/WebApplication95/EndPoints/ChatEndPoint.cs new file mode 100644 index 0000000000..9ded2f2a87 --- /dev/null +++ b/src/WebApplication95/EndPoints/ChatEndPoint.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Channels; + +namespace WebApplication95.EndPoints +{ + public class ChatEndPoint : EndPoint + { + private Bus bus = new Bus(); + + public override async Task OnConnected(Connection connection) + { + using (bus.Subscribe(nameof(ChatEndPoint), message => OnMessage(message, connection))) + { + while (true) + { + var input = await connection.Input.ReadAsync(); + try + { + if (input.IsEmpty && connection.Input.Reading.IsCompleted) + { + break; + } + + await bus.Publish(nameof(ChatEndPoint), new Message() + { + Payload = input + }); + } + finally + { + connection.Input.Advance(input.End); + } + } + } + + connection.Input.CompleteReader(); + } + + private async Task OnMessage(Message message, Connection connection) + { + var buffer = connection.Output.Alloc(); + var payload = message.Payload; + buffer.Append(ref payload); + await buffer.FlushAsync(); + } + } + +} diff --git a/src/WebApplication95/IDispatcher.cs b/src/WebApplication95/IDispatcher.cs deleted file mode 100644 index 2beada37bb..0000000000 --- a/src/WebApplication95/IDispatcher.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace WebApplication95 -{ - public interface IDispatcher - { - void OnIncoming(ArraySegment data); - } -} \ No newline at end of file diff --git a/src/WebApplication95/LongPolling.cs b/src/WebApplication95/LongPolling.cs index 7a6da27114..be1bc6c217 100644 --- a/src/WebApplication95/LongPolling.cs +++ b/src/WebApplication95/LongPolling.cs @@ -75,8 +75,10 @@ namespace WebApplication95 if (memory.TryGetArray(out data)) { await Send(data); + // Advance the buffer one block of memory - _state.Connection.Output.Advance(buffer.Slice(memory.Length).Start); + buffer = buffer.Slice(memory.Length); + _state.Connection.Output.Advance(buffer.Start); break; } } diff --git a/src/WebApplication95/Routing/PrefixRoute.cs b/src/WebApplication95/Routing/PrefixRoute.cs new file mode 100644 index 0000000000..177f7a1af2 --- /dev/null +++ b/src/WebApplication95/Routing/PrefixRoute.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Routing; + +namespace WebApplication95.Routing +{ + public class PrefixRoute : IRouter + { + private readonly IRouteHandler _target; + private readonly string _prefix; + + public PrefixRoute(IRouteHandler target, string prefix) + { + _target = target; + + if (prefix == null) + { + prefix = "/"; + } + else if (prefix.Length > 0 && prefix[0] != '/') + { + // owin.RequestPath starts with a / + prefix = "/" + prefix; + } + + if (prefix.Length > 1 && prefix[prefix.Length - 1] == '/') + { + prefix = prefix.Substring(0, prefix.Length - 1); + } + + _prefix = prefix; + } + + public Task RouteAsync(RouteContext context) + { + var requestPath = context.HttpContext.Request.Path.Value ?? string.Empty; + if (requestPath.StartsWith(_prefix, StringComparison.OrdinalIgnoreCase)) + { + if (requestPath.Length > _prefix.Length) + { + var lastCharacter = requestPath[_prefix.Length]; + if (lastCharacter != '/' && lastCharacter != '#' && lastCharacter != '?') + { + return Task.FromResult(0); + } + } + + context.Handler = _target.GetRequestHandler(context.HttpContext, context.RouteData); + } + + return Task.FromResult(0); + } + + public VirtualPathData GetVirtualPath(VirtualPathContext context) + { + return null; + } + } +} diff --git a/src/WebApplication95/Routing/RouteBuilderExtensions.cs b/src/WebApplication95/Routing/RouteBuilderExtensions.cs new file mode 100644 index 0000000000..45d253efd8 --- /dev/null +++ b/src/WebApplication95/Routing/RouteBuilderExtensions.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Routing; + +namespace WebApplication95.Routing +{ + + public static class RouteBuilderExtensions + { + public static IRouteBuilder AddPrefixRoute( + this IRouteBuilder routeBuilder, + string prefix, + IRouteHandler handler) + { + routeBuilder.Routes.Add(new PrefixRoute(handler, prefix)); + return routeBuilder; + } + } +} diff --git a/src/WebApplication95/Startup.cs b/src/WebApplication95/Startup.cs index 8ddf30b1fa..cabfd9d7cc 100644 --- a/src/WebApplication95/Startup.cs +++ b/src/WebApplication95/Startup.cs @@ -1,7 +1,10 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using WebApplication95.EndPoints; +using WebApplication95.Routing; namespace WebApplication95 { @@ -11,6 +14,9 @@ namespace WebApplication95 // For more information on how to configure your application, visit http://go.microsoft.com/fwlink/?LinkID=398940 public void ConfigureServices(IServiceCollection services) { + services.AddRouting(); + + services.AddSingleton(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -25,11 +31,9 @@ namespace WebApplication95 app.UseDeveloperExceptionPage(); } - var dispatcher = new Dispatcher(); - - app.Run(async (context) => + app.UseRealTimeConnections(d => { - await dispatcher.Execute(context); + d.MapEndPoint("/chat"); }); } } diff --git a/src/WebApplication95/project.json b/src/WebApplication95/project.json index 10ee59e745..241e3ec896 100644 --- a/src/WebApplication95/project.json +++ b/src/WebApplication95/project.json @@ -5,6 +5,7 @@ "version": "1.0.0", "type": "platform" }, + "Microsoft.AspNetCore.Routing": "1.0.0", "Microsoft.AspNetCore.Diagnostics": "1.0.0", "Microsoft.AspNetCore.StaticFiles": "1.0.0", diff --git a/src/WebApplication95/wwwroot/index.html b/src/WebApplication95/wwwroot/index.html index a956cdfb66..23d0c1e09f 100644 --- a/src/WebApplication95/wwwroot/index.html +++ b/src/WebApplication95/wwwroot/index.html @@ -11,7 +11,7 @@ var body = document.getElementById('data').value; var xhr = new XMLHttpRequest(); - var url = '/send?id=' + connectionId; + var url = '/chat/send?id=' + connectionId; xhr.open("POST", url, true); xhr.setRequestHeader('Content-type', 'application/json'); xhr.onreadystatechange = function () { @@ -22,7 +22,7 @@ xhr.send(data); } - var source = new EventSource('/sse'); + var source = new EventSource('/chat/sse'); source.onopen = function () { console.log('Opened!'); diff --git a/src/WebApplication95/wwwroot/polling.html b/src/WebApplication95/wwwroot/polling.html index fc75370f12..29a5e4b792 100644 --- a/src/WebApplication95/wwwroot/polling.html +++ b/src/WebApplication95/wwwroot/polling.html @@ -11,7 +11,7 @@ var body = document.getElementById('data').value; var xhr = new XMLHttpRequest(); - var url = '/send?id=' + connectionId; + var url = '/chat/send?id=' + connectionId; xhr.open("POST", url, true); xhr.setRequestHeader('Content-type', 'application/json'); xhr.onreadystatechange = function () { @@ -25,7 +25,7 @@ function poll(id) { var xhr = new XMLHttpRequest(); - var url = '/poll' + (id == null ? '' : '?id=' + id); + var url = '/chat/poll' + (id == null ? '' : '?id=' + id); xhr.open("POST", url, true); xhr.onreadystatechange = function () { if (xhr.readyState == 4 && xhr.status == 200) {