From 8b905907fe386d7117e7a5c71e4d82d300d13b7c Mon Sep 17 00:00:00 2001 From: moozzyk Date: Tue, 4 Oct 2016 18:57:09 -0700 Subject: [PATCH] Typed formatters --- .../SocketsSample/EndPoints/ChatEndPoint.cs | 5 ++ .../SocketsSample/EndPoints/HubEndpoint.cs | 6 +-- .../SocketsSample/EndPoints/RpcEndpoint.cs | 15 ++++-- ...s => InvocationDescriptorLineFormatter.cs} | 44 +++++++-------- ...InvocationResultDescriptorLineFormatter.cs | 32 +++++++++++ samples/SocketsSample/RpcFormatterFactory.cs | 24 --------- samples/SocketsSample/RpcJSonFormatter.cs | 6 +-- samples/SocketsSample/Startup.cs | 24 ++++++--- samples/SocketsSample/wwwroot/hubs.html | 15 +++--- .../Connection.cs | 6 +-- .../EndPointFormatters.cs | 40 ++++++++++++++ .../HttpDispatcherAppBuilderExtensions.cs | 53 ++++++++++++++++++- .../IFormatter.cs | 6 +-- .../IFormatterFactory.cs | 8 --- .../SocketFormatters.cs | 33 ++++++++++++ 15 files changed, 224 insertions(+), 93 deletions(-) rename samples/SocketsSample/{RpcTextFormatter.cs => InvocationDescriptorLineFormatter.cs} (67%) create mode 100644 samples/SocketsSample/InvocationResultDescriptorLineFormatter.cs delete mode 100644 samples/SocketsSample/RpcFormatterFactory.cs create mode 100644 src/Microsoft.AspNetCore.Sockets/EndPointFormatters.cs delete mode 100644 src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs create mode 100644 src/Microsoft.AspNetCore.Sockets/SocketFormatters.cs diff --git a/samples/SocketsSample/EndPoints/ChatEndPoint.cs b/samples/SocketsSample/EndPoints/ChatEndPoint.cs index 6e978d139a..2360a7698e 100644 --- a/samples/SocketsSample/EndPoints/ChatEndPoint.cs +++ b/samples/SocketsSample/EndPoints/ChatEndPoint.cs @@ -10,6 +10,11 @@ namespace SocketsSample { public class ChatEndPoint : EndPoint { + public ChatEndPoint() + { + Console.Write(0); + } + public override async Task OnConnected(Connection connection) { await Broadcast($"{connection.ConnectionId} connected ({connection.Metadata["transport"]})"); diff --git a/samples/SocketsSample/EndPoints/HubEndpoint.cs b/samples/SocketsSample/EndPoints/HubEndpoint.cs index f0048590c5..640e5fab2c 100644 --- a/samples/SocketsSample/EndPoints/HubEndpoint.cs +++ b/samples/SocketsSample/EndPoints/HubEndpoint.cs @@ -4,8 +4,6 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Channels; -using Microsoft.AspNetCore.Sockets; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using SocketsSample.Hubs; @@ -77,12 +75,10 @@ namespace SocketsSample Arguments = args }; - var formatterFactory = _endPoint._serviceProvider.GetRequiredService(); - foreach (var connection in _endPoint.Connections) { // TODO: separate serialization from writing to stream - var formatter = formatterFactory.CreateFormatter(connection.Metadata.Format, (string)connection.Metadata["formatType"]); + var formatter = _endPoint.GetFormatter((string)connection.Metadata["formatType"]); tasks.Add(formatter.WriteAsync(message, connection.Channel.GetStream())); } diff --git a/samples/SocketsSample/EndPoints/RpcEndpoint.cs b/samples/SocketsSample/EndPoints/RpcEndpoint.cs index f1b21427f0..de7610c9eb 100644 --- a/samples/SocketsSample/EndPoints/RpcEndpoint.cs +++ b/samples/SocketsSample/EndPoints/RpcEndpoint.cs @@ -36,14 +36,18 @@ namespace SocketsSample RegisterRPCEndPoint(typeof(Echo)); } + protected IFormatter GetFormatter(string format) + { + return _serviceProvider + .GetRequiredService().GetEndPointFormatters(GetType()) + .GetFormatter(format); + } + public override async Task OnConnected(Connection connection) { // TODO: Dispatch from the caller await Task.Yield(); - - var formatterFactory = _serviceProvider.GetRequiredService(); - var formatType = (string)connection.Metadata["formatType"]; - var formatter = formatterFactory.CreateFormatter(connection.Metadata.Format, formatType); + var formatter = GetFormatter((string)connection.Metadata["formatType"]); while (true) { @@ -94,7 +98,8 @@ namespace SocketsSample }; } - await formatter.WriteAsync(result, connection.Channel.GetStream()); + var resultFormatter = GetFormatter((string)connection.Metadata["formatType"]); + await resultFormatter.WriteAsync(result, connection.Channel.GetStream()); } } diff --git a/samples/SocketsSample/RpcTextFormatter.cs b/samples/SocketsSample/InvocationDescriptorLineFormatter.cs similarity index 67% rename from samples/SocketsSample/RpcTextFormatter.cs rename to samples/SocketsSample/InvocationDescriptorLineFormatter.cs index 430e561ae5..01708d1227 100644 --- a/samples/SocketsSample/RpcTextFormatter.cs +++ b/samples/SocketsSample/InvocationDescriptorLineFormatter.cs @@ -1,31 +1,42 @@ -using System; -using System.IO; +using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Sockets; namespace SocketsSample { - public class RpcTextFormatter : IFormatter + public class InvocationDescriptorLineFormatter: IFormatter { - public async Task ReadAsync(Stream stream) + public async Task ReadAsync(Stream stream) { var streamReader = new StreamReader(stream); var line = await streamReader.ReadLineAsync(); var values = line.Split(','); - object x = new InvocationDescriptor + return new InvocationDescriptor { Id = values[0].Substring(2), Method = values[1].Substring(1), Arguments = values.Skip(2).ToArray() }; - - return (T)x; } - public async Task WriteAsync(T value, Stream stream) + public async Task WriteAsync(InvocationDescriptor value, Stream stream) { + var msg = $"CI{value.Id},M{value.Method},{string.Join(",", value.Arguments.Select(a => a.ToString()))}\n"; + await WriteAsync(stream, msg); + return; + } + + private async Task WriteAsync(Stream stream, string msg) + { + var writer = new StreamWriter(stream); + await writer.WriteAsync(msg); + await writer.FlushAsync(); + } + } + + /* var result = value as InvocationResultDescriptor; if (result != null) { @@ -39,20 +50,5 @@ namespace SocketsSample var invocation = value as InvocationDescriptor; if (invocation != null) - { - var msg = $"CI{invocation.Id},M{invocation.Method},{string.Join(",", invocation.Arguments.Select(a => a.ToString()))}\n"; - await WriteAsync(stream, msg); - return; - } - - throw new NotImplementedException("Unsupported type"); - } - - private async Task WriteAsync(Stream stream, string msg) - { - var writer = new StreamWriter(stream); - await writer.WriteAsync(msg); - await writer.FlushAsync(); - } - } +*/ } diff --git a/samples/SocketsSample/InvocationResultDescriptorLineFormatter.cs b/samples/SocketsSample/InvocationResultDescriptorLineFormatter.cs new file mode 100644 index 0000000000..967802dab8 --- /dev/null +++ b/samples/SocketsSample/InvocationResultDescriptorLineFormatter.cs @@ -0,0 +1,32 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Sockets; + +namespace SocketsSample +{ + public class InvocationResultDescriptorLineFormatter : IFormatter + { + public Task ReadAsync(Stream stream) + { + throw new NotImplementedException(); + } + + public async Task WriteAsync(InvocationResultDescriptor value, Stream stream) + { + var msg = $"RI{value.Id}," + + (!string.IsNullOrEmpty(value.Error) + ? $"E{value.Error}\n" + : $"R{value?.Result?.ToString() ?? string.Empty}\n"); + + await WriteAsync(stream, msg); + } + + private async Task WriteAsync(Stream stream, string msg) + { + var writer = new StreamWriter(stream); + await writer.WriteAsync(msg); + await writer.FlushAsync(); + } + } +} diff --git a/samples/SocketsSample/RpcFormatterFactory.cs b/samples/SocketsSample/RpcFormatterFactory.cs deleted file mode 100644 index 20040353a4..0000000000 --- a/samples/SocketsSample/RpcFormatterFactory.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -using Microsoft.AspNetCore.Sockets; - -namespace SocketsSample -{ - public class RpcFormatterFactory : IFormatterFactory - { - public IFormatter CreateFormatter(Format format, string formatType) - { - if (format == Format.Text) - { - switch(formatType) - { - case "json": - return new RpcJSonFormatter(); - case "line": - return new RpcTextFormatter(); - } - } - - throw new InvalidOperationException($"No formatter for format '{format}' and formatType 'formatType'."); - } - } -} diff --git a/samples/SocketsSample/RpcJSonFormatter.cs b/samples/SocketsSample/RpcJSonFormatter.cs index 06ac3f9080..e58d9875e5 100644 --- a/samples/SocketsSample/RpcJSonFormatter.cs +++ b/samples/SocketsSample/RpcJSonFormatter.cs @@ -6,17 +6,17 @@ using Newtonsoft.Json; namespace SocketsSample { - public class RpcJSonFormatter : IFormatter + public class RpcJSonFormatter: IFormatter { private JsonSerializer _serializer = new JsonSerializer(); - public async Task ReadAsync(Stream stream) + public async Task ReadAsync(Stream stream) { var reader = new JsonTextReader(new StreamReader(stream)); return await Task.Run(() => _serializer.Deserialize(reader)); } - public Task WriteAsync(T value, Stream stream) + public Task WriteAsync(T value, Stream stream) { var writer = new JsonTextWriter(new StreamWriter(stream)); _serializer.Serialize(writer, value); diff --git a/samples/SocketsSample/Startup.cs b/samples/SocketsSample/Startup.cs index 87c3a42c41..6e46ae8e61 100644 --- a/samples/SocketsSample/Startup.cs +++ b/samples/SocketsSample/Startup.cs @@ -1,4 +1,6 @@ -using Microsoft.AspNetCore.Builder; +using System; +using System.Collections.Generic; +using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Sockets; using Microsoft.Extensions.DependencyInjection; @@ -18,7 +20,11 @@ namespace SocketsSample services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton>(); + services.AddSingleton>(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. @@ -33,11 +39,17 @@ namespace SocketsSample app.UseDeveloperExceptionPage(); } - app.UseSockets(routes => + app.UseSockets(endpoints => { - routes.MapSocketEndpoint("/hubs"); - routes.MapSocketEndpoint("/chat"); - routes.MapSocketEndpoint("/jsonrpc"); + endpoints.Configure() + .MapRoute("/hubs") + .MapFormatter("line") + .MapFormatter("line") + .MapFormatter>("json") + .MapFormatter>("json"); + + endpoints.Configure().MapRoute("/chat"); + endpoints.Configure().MapRoute("/jsonrpc"); }); } } diff --git a/samples/SocketsSample/wwwroot/hubs.html b/samples/SocketsSample/wwwroot/hubs.html index a45acd8858..9eb2f83a47 100644 --- a/samples/SocketsSample/wwwroot/hubs.html +++ b/samples/SocketsSample/wwwroot/hubs.html @@ -18,22 +18,21 @@ let response = {}; - if (document.getElementById('formatType').value == 'line') - { + if (document.getElementById('formatType').value == 'line') { let parts = evt.data.split(','); - if (evt.data[0] == 'R') - { + if (evt.data[0] == 'R') { response.Id = parts[0].slice(2); response.Result = parts[1].slice(1); } - else - { + else if (evt.data[0] == 'C') { response.Method = parts[1].slice(1); response.Arguments = parts.slice(2).join(); } + else { + response.error = parts[0].slice(1); + } } - else - { + else { response = JSON.parse(evt.data); } diff --git a/src/Microsoft.AspNetCore.Sockets/Connection.cs b/src/Microsoft.AspNetCore.Sockets/Connection.cs index 77a16f4220..cb0168f3e1 100644 --- a/src/Microsoft.AspNetCore.Sockets/Connection.cs +++ b/src/Microsoft.AspNetCore.Sockets/Connection.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Security.Claims; -using System.Threading.Tasks; +using System.Security.Claims; using Channels; namespace Microsoft.AspNetCore.Sockets diff --git a/src/Microsoft.AspNetCore.Sockets/EndPointFormatters.cs b/src/Microsoft.AspNetCore.Sockets/EndPointFormatters.cs new file mode 100644 index 0000000000..5425f1412e --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets/EndPointFormatters.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using Microsoft.AspNetCore.Sockets; +using Microsoft.Extensions.DependencyInjection; + +public class EndPointFormatters +{ + private IServiceProvider _serviceProvider; + private Dictionary> _formatters = new Dictionary>(); + + public EndPointFormatters(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + + public void RegisterFormatter(string format) + where TFormatterType : IFormatter + { + Dictionary formatFormatters; + if (!_formatters.TryGetValue(format, out formatFormatters)) + { + formatFormatters = _formatters[format] = new Dictionary(); + } + + formatFormatters[typeof(T)] = typeof(TFormatterType); + } + + public IFormatter GetFormatter(string format) + { + Dictionary formatters; + Type targetFormatterType; + + if (_formatters.TryGetValue(format, out formatters) && formatters.TryGetValue(typeof(T), out targetFormatterType)) + { + return (IFormatter)_serviceProvider.GetRequiredService(targetFormatterType); + } + + throw new InvalidOperationException($"No formatter register for format '{format}' and type '{typeof(T).GetType().FullName}'"); + } +} diff --git a/src/Microsoft.AspNetCore.Sockets/HttpDispatcherAppBuilderExtensions.cs b/src/Microsoft.AspNetCore.Sockets/HttpDispatcherAppBuilderExtensions.cs index 97a87f5cc4..93c28715f5 100644 --- a/src/Microsoft.AspNetCore.Sockets/HttpDispatcherAppBuilderExtensions.cs +++ b/src/Microsoft.AspNetCore.Sockets/HttpDispatcherAppBuilderExtensions.cs @@ -3,19 +3,20 @@ using Channels; using Microsoft.AspNetCore.Routing; using Microsoft.AspNetCore.Sockets; using Microsoft.AspNetCore.Sockets.Routing; +using Microsoft.Extensions.DependencyInjection; namespace Microsoft.AspNetCore.Builder { public static class HttpDispatcherAppBuilderExtensions { - public static IApplicationBuilder UseSockets(this IApplicationBuilder app, Action callback) + public static IApplicationBuilder UseSockets(this IApplicationBuilder app, Action callback) { var manager = new ConnectionManager(); var factory = new ChannelFactory(); var dispatcher = new HttpConnectionDispatcher(manager, factory); var routes = new RouteBuilder(app); - callback(new SocketRouteBuilder(routes, dispatcher)); + callback(new EndPointBuilder(routes, dispatcher, app.ApplicationServices)); // TODO: Use new low allocating websocket API app.UseWebSockets(); @@ -23,6 +24,53 @@ namespace Microsoft.AspNetCore.Builder return app; } + public class EndPointBuilder + { + private readonly HttpConnectionDispatcher _dispatcher; + private readonly RouteBuilder _routes; + private readonly IServiceProvider _serviceProvider; + + public EndPointBuilder(RouteBuilder routes, HttpConnectionDispatcher dispatcher, IServiceProvider serviceProvider) + { + _routes = routes; + _dispatcher = dispatcher; + _serviceProvider = serviceProvider; + } + + public EndPointConfiguration Configure() where TEndPoint : EndPoint + { + var socketFormatters = _serviceProvider.GetRequiredService(); + return new EndPointConfiguration(_routes, _dispatcher, socketFormatters.GetEndPointFormatters()); + } + } + + public class EndPointConfiguration where TEndPoint : EndPoint + { + private readonly HttpConnectionDispatcher _dispatcher; + private readonly RouteBuilder _routes; + private readonly EndPointFormatters _formatters; + + public EndPointConfiguration(RouteBuilder routes, HttpConnectionDispatcher dispatcher, EndPointFormatters formatters) + { + _routes = routes; + _dispatcher = dispatcher; + _formatters = formatters; + } + + public EndPointConfiguration MapRoute(string path) + { + _routes.AddPrefixRoute(path, new RouteHandler(c => _dispatcher.Execute(path, c))); + return this; + } + + public EndPointConfiguration MapFormatter(string format) + where TFormatterType : IFormatter + { + _formatters.RegisterFormatter(format); + return this; + } + } + public class SocketRouteBuilder { private readonly HttpConnectionDispatcher _dispatcher; @@ -37,6 +85,7 @@ namespace Microsoft.AspNetCore.Builder public void MapSocketEndpoint(string path) where TEndPoint : EndPoint { _routes.AddPrefixRoute(path, new RouteHandler(c => _dispatcher.Execute(path, c))); + } } } diff --git a/src/Microsoft.AspNetCore.Sockets/IFormatter.cs b/src/Microsoft.AspNetCore.Sockets/IFormatter.cs index 4183571460..0ffbff5d42 100644 --- a/src/Microsoft.AspNetCore.Sockets/IFormatter.cs +++ b/src/Microsoft.AspNetCore.Sockets/IFormatter.cs @@ -4,9 +4,9 @@ using System.Threading.Tasks; namespace Microsoft.AspNetCore.Sockets { // TODO: Is this name too generic? - public interface IFormatter + public interface IFormatter { - Task ReadAsync(Stream stream); - Task WriteAsync(T value, Stream stream); + Task ReadAsync(Stream stream); + Task WriteAsync(T value, Stream stream); } } diff --git a/src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs b/src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs deleted file mode 100644 index e7313d64a4..0000000000 --- a/src/Microsoft.AspNetCore.Sockets/IFormatterFactory.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Microsoft.AspNetCore.Sockets -{ - // TODO: Should the user implement this or just register their formatters? - public interface IFormatterFactory - { - IFormatter CreateFormatter(Format format, string formatType); - } -} diff --git a/src/Microsoft.AspNetCore.Sockets/SocketFormatters.cs b/src/Microsoft.AspNetCore.Sockets/SocketFormatters.cs new file mode 100644 index 0000000000..b2064d447a --- /dev/null +++ b/src/Microsoft.AspNetCore.Sockets/SocketFormatters.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; + +namespace Microsoft.AspNetCore.Sockets +{ + public class SocketFormatters + { + private Dictionary _formatters = new Dictionary(); + + private IServiceProvider _serviceProvider; + + public SocketFormatters(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + + public EndPointFormatters GetEndPointFormatters() + { + return GetEndPointFormatters(typeof(TEndPoint)); + } + + public EndPointFormatters GetEndPointFormatters(Type endPointType) + { + EndPointFormatters endPointFormatters; + if (_formatters.TryGetValue(endPointType, out endPointFormatters)) + { + return endPointFormatters; + } + + return _formatters[endPointType] = new EndPointFormatters(_serviceProvider); + } + } +}