Adding pluggable formatters
This commit is contained in:
parent
1694f2b791
commit
04fede0436
|
|
@ -0,0 +1,44 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace PersisitentConnection
|
||||
{
|
||||
public class FormatterResolver
|
||||
{
|
||||
private IServiceProvider _serviceProvider;
|
||||
|
||||
private Dictionary<string, Dictionary<Type, Type>> _formatters
|
||||
= new Dictionary<string, Dictionary<Type, Type>>();
|
||||
|
||||
public FormatterResolver(IServiceProvider serviceProvider)
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
public void AddFormatter<T, TFormatterType>(string formatType)
|
||||
where TFormatterType : IStreamFormatter<T>
|
||||
{
|
||||
Dictionary<Type, Type> typeFormatters;
|
||||
if (!_formatters.TryGetValue(formatType, out typeFormatters))
|
||||
{
|
||||
typeFormatters = _formatters[formatType] = new Dictionary<Type, Type>();
|
||||
}
|
||||
typeFormatters[typeof(T)] = typeof(TFormatterType);
|
||||
}
|
||||
|
||||
public IStreamFormatter<T> GetFormatter<T>(string formatType)
|
||||
{
|
||||
Dictionary<Type, Type> typeFormatters;
|
||||
Type typeFormatterType;
|
||||
if (_formatters.TryGetValue(formatType, out typeFormatters) &&
|
||||
typeFormatters.TryGetValue(typeof(T), out typeFormatterType))
|
||||
{
|
||||
return (IStreamFormatter<T>)_serviceProvider.GetRequiredService(typeFormatterType);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -9,8 +9,14 @@ namespace PersisitentConnection
|
|||
{
|
||||
public class PersistentConnectionLifeTimeManager
|
||||
{
|
||||
private readonly FormatterResolver _formatterResolver;
|
||||
private readonly ConnectionList _connectionList = new ConnectionList();
|
||||
|
||||
public PersistentConnectionLifeTimeManager(FormatterResolver formatterResolver)
|
||||
{
|
||||
_formatterResolver = formatterResolver;
|
||||
}
|
||||
|
||||
public void OnConnectedAsync(Connection connection)
|
||||
{
|
||||
_connectionList.Add(connection);
|
||||
|
|
@ -25,8 +31,7 @@ namespace PersisitentConnection
|
|||
{
|
||||
foreach (var connection in _connectionList)
|
||||
{
|
||||
// var formatType = connection.Metadata.Get<string>("formatType");
|
||||
var formatter = new JsonStreamFormatter<T>();
|
||||
var formatter = _formatterResolver.GetFormatter<T>(connection.Metadata.Get<string>("formatType"));
|
||||
await formatter.WriteAsync(data, connection.Channel.GetStream());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@ using System.Threading.Tasks;
|
|||
|
||||
namespace PersisitentConnection
|
||||
{
|
||||
public class ProtobufWeatherStreamFormatter : IStreamFormatter<Weather>
|
||||
public class ProtobufWeatherStreamFormatter : IStreamFormatter<WeatherReport>
|
||||
{
|
||||
public Task<Weather> ReadAsync(Stream stream)
|
||||
public Task<WeatherReport> ReadAsync(Stream stream)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public Task WriteAsync(Weather value, Stream stream)
|
||||
public Task WriteAsync(WeatherReport value, Stream stream)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,30 +7,32 @@ namespace PersisitentConnection
|
|||
{
|
||||
public class SocialWeatherEndPoint : EndPoint
|
||||
{
|
||||
private readonly PersistentConnectionLifeTimeManager _lifetimeManager = new PersistentConnectionLifeTimeManager();
|
||||
private readonly PersistentConnectionLifeTimeManager _lifetimeManager;
|
||||
private readonly FormatterResolver _formatterResolver;
|
||||
private readonly ILogger<SocialWeatherEndPoint> _logger;
|
||||
private object _lockObj = new object();
|
||||
private WeatherReport _lastWeatherReport;
|
||||
|
||||
public SocialWeatherEndPoint(ILogger<SocialWeatherEndPoint> logger)
|
||||
public SocialWeatherEndPoint(PersistentConnectionLifeTimeManager lifetimeManager,
|
||||
FormatterResolver formatterResolver, ILogger<SocialWeatherEndPoint> logger)
|
||||
{
|
||||
_lifetimeManager = lifetimeManager;
|
||||
_formatterResolver = formatterResolver;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async override Task OnConnectedAsync(Connection connection)
|
||||
{
|
||||
_lifetimeManager.OnConnectedAsync(connection);
|
||||
await DispatchMessagesAsync(connection);
|
||||
await ProcessRequests(connection);
|
||||
_lifetimeManager.OnDisconnectedAsync(connection);
|
||||
}
|
||||
|
||||
public async Task DispatchMessagesAsync(Connection connection)
|
||||
public async Task ProcessRequests(Connection connection)
|
||||
{
|
||||
var stream = connection.Channel.GetStream();
|
||||
//var formatType = connection.Metadata.Get<string>("formatType");
|
||||
//var formatterRegistry = _serviceProvider.GetRequiredService<FormatterRegistry>();
|
||||
//var formatter = formatterRegistry.GetFormatter(formatType);
|
||||
var formatter = new JsonStreamFormatter<WeatherReport>();
|
||||
var formatter = _formatterResolver.GetFormatter<WeatherReport>(
|
||||
connection.Metadata.Get<string>("formatType"));
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -14,6 +14,10 @@ namespace PersisitentConnection
|
|||
{
|
||||
services.AddRouting();
|
||||
services.AddSingleton<SocialWeatherEndPoint>();
|
||||
services.AddTransient<PersistentConnectionLifeTimeManager>();
|
||||
services.AddSingleton(typeof(JsonStreamFormatter<>), typeof(JsonStreamFormatter<>));
|
||||
services.AddSingleton<ProtobufWeatherStreamFormatter>();
|
||||
services.AddSingleton<FormatterResolver>();
|
||||
}
|
||||
|
||||
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
|
||||
|
|
@ -29,6 +33,10 @@ namespace PersisitentConnection
|
|||
app.UseSockets(o => { o.MapEndpoint<SocialWeatherEndPoint>("/weather"); });
|
||||
app.UseStaticFiles();
|
||||
|
||||
var formatterResolver = app.ApplicationServices.GetRequiredService<FormatterResolver>();
|
||||
formatterResolver.AddFormatter<WeatherReport, JsonStreamFormatter<WeatherReport>>("json");
|
||||
formatterResolver.AddFormatter<WeatherReport, ProtobufWeatherStreamFormatter>("protobuf");
|
||||
|
||||
app.Run(async (context) =>
|
||||
{
|
||||
await context.Response.WriteAsync("Hello World!");
|
||||
|
|
|
|||
|
|
@ -14,5 +14,7 @@ namespace PersisitentConnection
|
|||
public long ReportTime { get; set; }
|
||||
|
||||
public Weather Weather { get; set; }
|
||||
|
||||
public string ZipCode { get; set; }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,8 @@
|
|||
JSON.stringify({
|
||||
Temperature: '49',
|
||||
Weather: 'Cloudy',
|
||||
ReportTime: Date.now()
|
||||
ReportTime: Date.now(),
|
||||
ZipCode: "98034"
|
||||
}));
|
||||
|
||||
event.preventDefault();
|
||||
|
|
|
|||
Loading…
Reference in New Issue