Typed formatters

This commit is contained in:
moozzyk 2016-10-04 18:57:09 -07:00
parent f64c986b5d
commit 8b905907fe
15 changed files with 224 additions and 93 deletions

View File

@ -10,6 +10,11 @@ namespace SocketsSample
{
public class ChatEndPoint : EndPoint
{
public ChatEndPoint()
{
Console.Write(0);
}
public override async Task OnConnected(Connection connection)
{
await Broadcast($"{connection.ConnectionId} connected ({connection.Metadata["transport"]})");

View File

@ -4,8 +4,6 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Channels;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using SocketsSample.Hubs;
@ -77,12 +75,10 @@ namespace SocketsSample
Arguments = args
};
var formatterFactory = _endPoint._serviceProvider.GetRequiredService<IFormatterFactory>();
foreach (var connection in _endPoint.Connections)
{
// TODO: separate serialization from writing to stream
var formatter = formatterFactory.CreateFormatter(connection.Metadata.Format, (string)connection.Metadata["formatType"]);
var formatter = _endPoint.GetFormatter<InvocationDescriptor>((string)connection.Metadata["formatType"]);
tasks.Add(formatter.WriteAsync(message, connection.Channel.GetStream()));
}

View File

@ -36,14 +36,18 @@ namespace SocketsSample
RegisterRPCEndPoint(typeof(Echo));
}
protected IFormatter<T> GetFormatter<T>(string format)
{
return _serviceProvider
.GetRequiredService<SocketFormatters>().GetEndPointFormatters(GetType())
.GetFormatter<T>(format);
}
public override async Task OnConnected(Connection connection)
{
// TODO: Dispatch from the caller
await Task.Yield();
var formatterFactory = _serviceProvider.GetRequiredService<IFormatterFactory>();
var formatType = (string)connection.Metadata["formatType"];
var formatter = formatterFactory.CreateFormatter(connection.Metadata.Format, formatType);
var formatter = GetFormatter<InvocationDescriptor>((string)connection.Metadata["formatType"]);
while (true)
{
@ -94,7 +98,8 @@ namespace SocketsSample
};
}
await formatter.WriteAsync(result, connection.Channel.GetStream());
var resultFormatter = GetFormatter<InvocationResultDescriptor>((string)connection.Metadata["formatType"]);
await resultFormatter.WriteAsync(result, connection.Channel.GetStream());
}
}

View File

@ -1,31 +1,42 @@
using System;
using System.IO;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
namespace SocketsSample
{
public class RpcTextFormatter : IFormatter
public class InvocationDescriptorLineFormatter: IFormatter<InvocationDescriptor>
{
public async Task<T> ReadAsync<T>(Stream stream)
public async Task<InvocationDescriptor> ReadAsync(Stream stream)
{
var streamReader = new StreamReader(stream);
var line = await streamReader.ReadLineAsync();
var values = line.Split(',');
object x = new InvocationDescriptor
return new InvocationDescriptor
{
Id = values[0].Substring(2),
Method = values[1].Substring(1),
Arguments = values.Skip(2).ToArray()
};
return (T)x;
}
public async Task WriteAsync<T>(T value, Stream stream)
public async Task WriteAsync(InvocationDescriptor value, Stream stream)
{
var msg = $"CI{value.Id},M{value.Method},{string.Join(",", value.Arguments.Select(a => a.ToString()))}\n";
await WriteAsync(stream, msg);
return;
}
private async Task WriteAsync(Stream stream, string msg)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);
await writer.FlushAsync();
}
}
/*
var result = value as InvocationResultDescriptor;
if (result != null)
{
@ -39,20 +50,5 @@ namespace SocketsSample
var invocation = value as InvocationDescriptor;
if (invocation != null)
{
var msg = $"CI{invocation.Id},M{invocation.Method},{string.Join(",", invocation.Arguments.Select(a => a.ToString()))}\n";
await WriteAsync(stream, msg);
return;
}
throw new NotImplementedException("Unsupported type");
}
private async Task WriteAsync(Stream stream, string msg)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);
await writer.FlushAsync();
}
}
*/
}

View File

@ -0,0 +1,32 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
namespace SocketsSample
{
public class InvocationResultDescriptorLineFormatter : IFormatter<InvocationResultDescriptor>
{
public Task<InvocationResultDescriptor> ReadAsync(Stream stream)
{
throw new NotImplementedException();
}
public async Task WriteAsync(InvocationResultDescriptor value, Stream stream)
{
var msg = $"RI{value.Id}," +
(!string.IsNullOrEmpty(value.Error)
? $"E{value.Error}\n"
: $"R{value?.Result?.ToString() ?? string.Empty}\n");
await WriteAsync(stream, msg);
}
private async Task WriteAsync(Stream stream, string msg)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);
await writer.FlushAsync();
}
}
}

View File

@ -1,24 +0,0 @@
using System;
using Microsoft.AspNetCore.Sockets;
namespace SocketsSample
{
public class RpcFormatterFactory : IFormatterFactory
{
public IFormatter CreateFormatter(Format format, string formatType)
{
if (format == Format.Text)
{
switch(formatType)
{
case "json":
return new RpcJSonFormatter();
case "line":
return new RpcTextFormatter();
}
}
throw new InvalidOperationException($"No formatter for format '{format}' and formatType 'formatType'.");
}
}
}

View File

@ -6,17 +6,17 @@ using Newtonsoft.Json;
namespace SocketsSample
{
public class RpcJSonFormatter : IFormatter
public class RpcJSonFormatter<T>: IFormatter<T>
{
private JsonSerializer _serializer = new JsonSerializer();
public async Task<T> ReadAsync<T>(Stream stream)
public async Task<T> ReadAsync(Stream stream)
{
var reader = new JsonTextReader(new StreamReader(stream));
return await Task.Run(() => _serializer.Deserialize<T>(reader));
}
public Task WriteAsync<T>(T value, Stream stream)
public Task WriteAsync(T value, Stream stream)
{
var writer = new JsonTextWriter(new StreamWriter(stream));
_serializer.Serialize(writer, value);

View File

@ -1,4 +1,6 @@
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
@ -18,7 +20,11 @@ namespace SocketsSample
services.AddSingleton<RpcEndpoint>();
services.AddSingleton<ChatEndPoint>();
services.AddSingleton<IFormatterFactory, RpcFormatterFactory>();
services.AddSingleton<SocketFormatters>();
services.AddSingleton<InvocationDescriptorLineFormatter>();
services.AddSingleton<InvocationResultDescriptorLineFormatter>();
services.AddSingleton<RpcJSonFormatter<InvocationDescriptor>>();
services.AddSingleton<RpcJSonFormatter<InvocationResultDescriptor>>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@ -33,11 +39,17 @@ namespace SocketsSample
app.UseDeveloperExceptionPage();
}
app.UseSockets(routes =>
app.UseSockets(endpoints =>
{
routes.MapSocketEndpoint<HubEndpoint>("/hubs");
routes.MapSocketEndpoint<ChatEndPoint>("/chat");
routes.MapSocketEndpoint<RpcEndpoint>("/jsonrpc");
endpoints.Configure<HubEndpoint>()
.MapRoute("/hubs")
.MapFormatter<InvocationDescriptor, InvocationDescriptorLineFormatter>("line")
.MapFormatter<InvocationResultDescriptor, InvocationResultDescriptorLineFormatter>("line")
.MapFormatter<InvocationDescriptor, RpcJSonFormatter<InvocationDescriptor>>("json")
.MapFormatter<InvocationResultDescriptor, RpcJSonFormatter<InvocationResultDescriptor>>("json");
endpoints.Configure<ChatEndPoint>().MapRoute("/chat");
endpoints.Configure<RpcEndpoint>().MapRoute("/jsonrpc");
});
}
}

View File

@ -18,22 +18,21 @@
let response = {};
if (document.getElementById('formatType').value == 'line')
{
if (document.getElementById('formatType').value == 'line') {
let parts = evt.data.split(',');
if (evt.data[0] == 'R')
{
if (evt.data[0] == 'R') {
response.Id = parts[0].slice(2);
response.Result = parts[1].slice(1);
}
else
{
else if (evt.data[0] == 'C') {
response.Method = parts[1].slice(1);
response.Arguments = parts.slice(2).join();
}
else {
response.error = parts[0].slice(1);
}
}
else
{
else {
response = JSON.parse(evt.data);
}

View File

@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Claims;
using System.Threading.Tasks;
using System.Security.Claims;
using Channels;
namespace Microsoft.AspNetCore.Sockets

View File

@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using Microsoft.AspNetCore.Sockets;
using Microsoft.Extensions.DependencyInjection;
public class EndPointFormatters
{
private IServiceProvider _serviceProvider;
private Dictionary<string, Dictionary<Type, Type>> _formatters = new Dictionary<string, Dictionary<Type, Type>>();
public EndPointFormatters(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public void RegisterFormatter<T, TFormatterType>(string format)
where TFormatterType : IFormatter<T>
{
Dictionary<Type, Type> formatFormatters;
if (!_formatters.TryGetValue(format, out formatFormatters))
{
formatFormatters = _formatters[format] = new Dictionary<Type, Type>();
}
formatFormatters[typeof(T)] = typeof(TFormatterType);
}
public IFormatter<T> GetFormatter<T>(string format)
{
Dictionary<Type, Type> formatters;
Type targetFormatterType;
if (_formatters.TryGetValue(format, out formatters) && formatters.TryGetValue(typeof(T), out targetFormatterType))
{
return (IFormatter<T>)_serviceProvider.GetRequiredService(targetFormatterType);
}
throw new InvalidOperationException($"No formatter register for format '{format}' and type '{typeof(T).GetType().FullName}'");
}
}

View File

@ -3,19 +3,20 @@ using Channels;
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Routing;
using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.AspNetCore.Builder
{
public static class HttpDispatcherAppBuilderExtensions
{
public static IApplicationBuilder UseSockets(this IApplicationBuilder app, Action<SocketRouteBuilder> callback)
public static IApplicationBuilder UseSockets(this IApplicationBuilder app, Action<EndPointBuilder> callback)
{
var manager = new ConnectionManager();
var factory = new ChannelFactory();
var dispatcher = new HttpConnectionDispatcher(manager, factory);
var routes = new RouteBuilder(app);
callback(new SocketRouteBuilder(routes, dispatcher));
callback(new EndPointBuilder(routes, dispatcher, app.ApplicationServices));
// TODO: Use new low allocating websocket API
app.UseWebSockets();
@ -23,6 +24,53 @@ namespace Microsoft.AspNetCore.Builder
return app;
}
public class EndPointBuilder
{
private readonly HttpConnectionDispatcher _dispatcher;
private readonly RouteBuilder _routes;
private readonly IServiceProvider _serviceProvider;
public EndPointBuilder(RouteBuilder routes, HttpConnectionDispatcher dispatcher, IServiceProvider serviceProvider)
{
_routes = routes;
_dispatcher = dispatcher;
_serviceProvider = serviceProvider;
}
public EndPointConfiguration<TEndPoint> Configure<TEndPoint>() where TEndPoint : EndPoint
{
var socketFormatters = _serviceProvider.GetRequiredService<SocketFormatters>();
return new EndPointConfiguration<TEndPoint>(_routes, _dispatcher, socketFormatters.GetEndPointFormatters<TEndPoint>());
}
}
public class EndPointConfiguration<TEndPoint> where TEndPoint : EndPoint
{
private readonly HttpConnectionDispatcher _dispatcher;
private readonly RouteBuilder _routes;
private readonly EndPointFormatters _formatters;
public EndPointConfiguration(RouteBuilder routes, HttpConnectionDispatcher dispatcher, EndPointFormatters formatters)
{
_routes = routes;
_dispatcher = dispatcher;
_formatters = formatters;
}
public EndPointConfiguration<TEndPoint> MapRoute(string path)
{
_routes.AddPrefixRoute(path, new RouteHandler(c => _dispatcher.Execute<TEndPoint>(path, c)));
return this;
}
public EndPointConfiguration<TEndPoint> MapFormatter<T, TFormatterType>(string format)
where TFormatterType : IFormatter<T>
{
_formatters.RegisterFormatter<T, TFormatterType>(format);
return this;
}
}
public class SocketRouteBuilder
{
private readonly HttpConnectionDispatcher _dispatcher;
@ -37,6 +85,7 @@ namespace Microsoft.AspNetCore.Builder
public void MapSocketEndpoint<TEndPoint>(string path) where TEndPoint : EndPoint
{
_routes.AddPrefixRoute(path, new RouteHandler(c => _dispatcher.Execute<TEndPoint>(path, c)));
}
}
}

View File

@ -4,9 +4,9 @@ using System.Threading.Tasks;
namespace Microsoft.AspNetCore.Sockets
{
// TODO: Is this name too generic?
public interface IFormatter
public interface IFormatter<T>
{
Task<T> ReadAsync<T>(Stream stream);
Task WriteAsync<T>(T value, Stream stream);
Task<T> ReadAsync(Stream stream);
Task WriteAsync(T value, Stream stream);
}
}

View File

@ -1,8 +0,0 @@
namespace Microsoft.AspNetCore.Sockets
{
// TODO: Should the user implement this or just register their formatters?
public interface IFormatterFactory
{
IFormatter CreateFormatter(Format format, string formatType);
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
namespace Microsoft.AspNetCore.Sockets
{
public class SocketFormatters
{
private Dictionary<Type, EndPointFormatters> _formatters = new Dictionary<Type, EndPointFormatters>();
private IServiceProvider _serviceProvider;
public SocketFormatters(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public EndPointFormatters GetEndPointFormatters<TEndPoint>()
{
return GetEndPointFormatters(typeof(TEndPoint));
}
public EndPointFormatters GetEndPointFormatters(Type endPointType)
{
EndPointFormatters endPointFormatters;
if (_formatters.TryGetValue(endPointType, out endPointFormatters))
{
return endPointFormatters;
}
return _formatters[endPointType] = new EndPointFormatters(_serviceProvider);
}
}
}