More cleanup

This commit is contained in:
David Fowler 2016-09-30 02:41:07 -07:00
parent 03352354dc
commit 27ddb7de90
12 changed files with 217 additions and 85 deletions

View File

@ -4,22 +4,22 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Channels;
namespace WebApplication95
{
public class Message
{
public string ContentType { get; set; }
public ArraySegment<byte> Payload { get; set; }
public ReadableBuffer Payload { get; set; }
}
public class Bus
{
private readonly ConcurrentDictionary<string, List<IObserver<Message>>> _subscriptions = new ConcurrentDictionary<string, List<IObserver<Message>>>();
private readonly ConcurrentDictionary<string, List<Func<Message, Task>>> _subscriptions = new ConcurrentDictionary<string, List<Func<Message, Task>>>();
public IDisposable Subscribe(string key, IObserver<Message> observer)
public IDisposable Subscribe(string key, Func<Message, Task> observer)
{
var connections = _subscriptions.GetOrAdd(key, _ => new List<IObserver<Message>>());
var connections = _subscriptions.GetOrAdd(key, _ => new List<Func<Message, Task>>());
connections.Add(observer);
return new DisposableAction(() =>
@ -28,14 +28,14 @@ namespace WebApplication95
});
}
public void Publish(string key, Message message)
public async Task Publish(string key, Message message)
{
List<IObserver<Message>> connections;
List<Func<Message, Task>> connections;
if (_subscriptions.TryGetValue(key, out connections))
{
foreach (var c in connections)
{
c.OnNext(message);
await c(message);
}
}
}

View File

@ -1,22 +1,47 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Primitives;
using WebApplication95.Routing;
namespace WebApplication95
{
public static class DispatcherExtensions
{
public static IApplicationBuilder UseRealTimeConnections(this IApplicationBuilder app, Action<Dispatcher> callback)
{
var dispatcher = new Dispatcher(app);
callback(dispatcher);
app.UseRouter(dispatcher.GetRouter());
return app;
}
}
public class Dispatcher
{
private readonly ConnectionManager _manager = new ConnectionManager();
private readonly EndPoint _endpoint = new EndPoint();
private readonly RouteBuilder _routes;
public async Task Execute(HttpContext context)
public Dispatcher(IApplicationBuilder app)
{
if (context.Request.Path.StartsWithSegments("/send"))
_routes = new RouteBuilder(app);
}
public void MapEndPoint<TEndPoint>(string path) where TEndPoint : EndPoint
{
_routes.AddPrefixRoute(path, new RouteHandler(c => Execute<TEndPoint>(path, c)));
}
public IRouter GetRouter() => _routes.Build();
public async Task Execute<TEndPoint>(string path, HttpContext context) where TEndPoint : EndPoint
{
if (context.Request.Path.StartsWithSegments(path + "/send"))
{
var connectionId = context.Request.Query["id"];
@ -34,17 +59,19 @@ namespace WebApplication95
}
else
{
var endpoint = (EndPoint)context.RequestServices.GetRequiredService<TEndPoint>();
var connectionId = _manager.GetConnectionId(context);
// Outgoing channels
if (context.Request.Path.StartsWithSegments("/sse"))
if (context.Request.Path.StartsWithSegments(path + "/sse"))
{
ConnectionState state;
_manager.AddConnection(connectionId, out state);
var sse = new ServerSentEvents(state);
var ignore = _endpoint.OnConnected(state.Connection);
var ignore = endpoint.OnConnected(state.Connection);
state.Connection.TransportType = TransportType.ServerSentEvents;
@ -54,14 +81,14 @@ namespace WebApplication95
_manager.RemoveConnection(connectionId);
}
else if (context.Request.Path.StartsWithSegments("/ws"))
else if (context.Request.Path.StartsWithSegments(path + "/ws"))
{
ConnectionState state;
_manager.AddConnection(connectionId, out state);
var ws = new WebSockets(state);
var ignore = _endpoint.OnConnected(state.Connection);
var ignore = endpoint.OnConnected(state.Connection);
state.Connection.TransportType = TransportType.WebSockets;
@ -71,14 +98,15 @@ namespace WebApplication95
_manager.RemoveConnection(connectionId);
}
else if (context.Request.Path.StartsWithSegments("/poll"))
else if (context.Request.Path.StartsWithSegments(path + "/poll"))
{
ConnectionState state;
bool newConnection = false;
if (_manager.AddConnection(connectionId, out state))
{
newConnection = true;
var ignore = _endpoint.OnConnected(state.Connection);
var ignore = endpoint.OnConnected(state.Connection);
state.Connection.TransportType = TransportType.LongPolling;
}
@ -92,52 +120,4 @@ namespace WebApplication95
}
}
}
public class EndPoint
{
private List<Connection> _connections = new List<Connection>();
public virtual async Task OnConnected(Connection connection)
{
lock (_connections)
{
_connections.Add(connection);
}
// Echo server
while (true)
{
var input = await connection.Input.ReadAsync();
try
{
if (input.IsEmpty && connection.Input.Reading.IsCompleted)
{
break;
}
List<Connection> connections = null;
lock (_connections)
{
connections = _connections;
}
foreach (var c in connections)
{
var output = c.Output.Alloc();
output.Append(ref input);
await output.FlushAsync();
}
}
finally
{
connection.Input.Advance(input.End);
}
}
lock (_connections)
{
_connections.Remove(connection);
}
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Channels;
namespace WebApplication95
{
/// <summary>
/// Represents an end point that multiple connections connect to. For HTTP, endpoints are URLs, for non HTTP it can be a TCP listener (or similar)
/// </summary>
public class EndPoint
{
// This is a stream based API, we might just want to change to a message based API or invent framing
// over this stream based API to do a message based API
public virtual Task OnConnected(Connection connection)
{
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Channels;
namespace WebApplication95.EndPoints
{
public class ChatEndPoint : EndPoint
{
private Bus bus = new Bus();
public override async Task OnConnected(Connection connection)
{
using (bus.Subscribe(nameof(ChatEndPoint), message => OnMessage(message, connection)))
{
while (true)
{
var input = await connection.Input.ReadAsync();
try
{
if (input.IsEmpty && connection.Input.Reading.IsCompleted)
{
break;
}
await bus.Publish(nameof(ChatEndPoint), new Message()
{
Payload = input
});
}
finally
{
connection.Input.Advance(input.End);
}
}
}
connection.Input.CompleteReader();
}
private async Task OnMessage(Message message, Connection connection)
{
var buffer = connection.Output.Alloc();
var payload = message.Payload;
buffer.Append(ref payload);
await buffer.FlushAsync();
}
}
}

View File

@ -1,9 +0,0 @@
using System;
namespace WebApplication95
{
public interface IDispatcher
{
void OnIncoming(ArraySegment<byte> data);
}
}

View File

@ -75,8 +75,10 @@ namespace WebApplication95
if (memory.TryGetArray(out data))
{
await Send(data);
// Advance the buffer one block of memory
_state.Connection.Output.Advance(buffer.Slice(memory.Length).Start);
buffer = buffer.Slice(memory.Length);
_state.Connection.Output.Advance(buffer.Start);
break;
}
}

View File

@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Routing;
namespace WebApplication95.Routing
{
public class PrefixRoute : IRouter
{
private readonly IRouteHandler _target;
private readonly string _prefix;
public PrefixRoute(IRouteHandler target, string prefix)
{
_target = target;
if (prefix == null)
{
prefix = "/";
}
else if (prefix.Length > 0 && prefix[0] != '/')
{
// owin.RequestPath starts with a /
prefix = "/" + prefix;
}
if (prefix.Length > 1 && prefix[prefix.Length - 1] == '/')
{
prefix = prefix.Substring(0, prefix.Length - 1);
}
_prefix = prefix;
}
public Task RouteAsync(RouteContext context)
{
var requestPath = context.HttpContext.Request.Path.Value ?? string.Empty;
if (requestPath.StartsWith(_prefix, StringComparison.OrdinalIgnoreCase))
{
if (requestPath.Length > _prefix.Length)
{
var lastCharacter = requestPath[_prefix.Length];
if (lastCharacter != '/' && lastCharacter != '#' && lastCharacter != '?')
{
return Task.FromResult(0);
}
}
context.Handler = _target.GetRequestHandler(context.HttpContext, context.RouteData);
}
return Task.FromResult(0);
}
public VirtualPathData GetVirtualPath(VirtualPathContext context)
{
return null;
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Routing;
namespace WebApplication95.Routing
{
public static class RouteBuilderExtensions
{
public static IRouteBuilder AddPrefixRoute(
this IRouteBuilder routeBuilder,
string prefix,
IRouteHandler handler)
{
routeBuilder.Routes.Add(new PrefixRoute(handler, prefix));
return routeBuilder;
}
}
}

View File

@ -1,7 +1,10 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using WebApplication95.EndPoints;
using WebApplication95.Routing;
namespace WebApplication95
{
@ -11,6 +14,9 @@ namespace WebApplication95
// For more information on how to configure your application, visit http://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddRouting();
services.AddSingleton<ChatEndPoint>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@ -25,11 +31,9 @@ namespace WebApplication95
app.UseDeveloperExceptionPage();
}
var dispatcher = new Dispatcher();
app.Run(async (context) =>
app.UseRealTimeConnections(d =>
{
await dispatcher.Execute(context);
d.MapEndPoint<ChatEndPoint>("/chat");
});
}
}

View File

@ -5,6 +5,7 @@
"version": "1.0.0",
"type": "platform"
},
"Microsoft.AspNetCore.Routing": "1.0.0",
"Microsoft.AspNetCore.Diagnostics": "1.0.0",
"Microsoft.AspNetCore.StaticFiles": "1.0.0",

View File

@ -11,7 +11,7 @@
var body = document.getElementById('data').value;
var xhr = new XMLHttpRequest();
var url = '/send?id=' + connectionId;
var url = '/chat/send?id=' + connectionId;
xhr.open("POST", url, true);
xhr.setRequestHeader('Content-type', 'application/json');
xhr.onreadystatechange = function () {
@ -22,7 +22,7 @@
xhr.send(data);
}
var source = new EventSource('/sse');
var source = new EventSource('/chat/sse');
source.onopen = function () {
console.log('Opened!');

View File

@ -11,7 +11,7 @@
var body = document.getElementById('data').value;
var xhr = new XMLHttpRequest();
var url = '/send?id=' + connectionId;
var url = '/chat/send?id=' + connectionId;
xhr.open("POST", url, true);
xhr.setRequestHeader('Content-type', 'application/json');
xhr.onreadystatechange = function () {
@ -25,7 +25,7 @@
function poll(id) {
var xhr = new XMLHttpRequest();
var url = '/poll' + (id == null ? '' : '?id=' + id);
var url = '/chat/poll' + (id == null ? '' : '?id=' + id);
xhr.open("POST", url, true);
xhr.onreadystatechange = function () {
if (xhr.readyState == 4 && xhr.status == 200) {