Fixing line protocol

This commit is contained in:
moozzyk 2016-10-11 15:51:48 -07:00
parent a8c831bad6
commit 5d41b218f0
6 changed files with 64 additions and 133 deletions

View File

@ -1,54 +0,0 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
namespace SocketsSample
{
public class InvocationDescriptorLineFormatter: IFormatter<InvocationDescriptor>
{
public async Task<InvocationDescriptor> ReadAsync(Stream stream)
{
var streamReader = new StreamReader(stream);
var line = await streamReader.ReadLineAsync();
var values = line.Split(',');
return new InvocationDescriptor
{
Id = values[0].Substring(2),
Method = values[1].Substring(1),
Arguments = values.Skip(2).ToArray()
};
}
public async Task WriteAsync(InvocationDescriptor value, Stream stream)
{
var msg = $"CI{value.Id},M{value.Method},{string.Join(",", value.Arguments.Select(a => a.ToString()))}\n";
await WriteAsync(stream, msg);
return;
}
private async Task WriteAsync(Stream stream, string msg)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);
await writer.FlushAsync();
}
}
/*
var result = value as InvocationResultDescriptor;
if (result != null)
{
var msg = $"RI{result.Id}," + string.IsNullOrEmpty(result.Error) != null
? $"E{result.Error}\n"
: $"R{result.Result.ToString()}\n";
await WriteAsync(stream, msg);
return;
}
var invocation = value as InvocationDescriptor;
if (invocation != null)
*/
}

View File

@ -1,32 +0,0 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
namespace SocketsSample
{
public class InvocationResultDescriptorLineFormatter : IFormatter<InvocationResultDescriptor>
{
public Task<InvocationResultDescriptor> ReadAsync(Stream stream)
{
throw new NotImplementedException();
}
public async Task WriteAsync(InvocationResultDescriptor value, Stream stream)
{
var msg = $"RI{value.Id}," +
(!string.IsNullOrEmpty(value.Error)
? $"E{value.Error}\n"
: $"R{value?.Result?.ToString() ?? string.Empty}\n");
await WriteAsync(stream, msg);
}
private async Task WriteAsync(Stream stream, string msg)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);
await writer.FlushAsync();
}
}
}

View File

@ -7,35 +7,35 @@ namespace SocketsSample
{
public class JSonInvocationAdapter : IInvocationAdapter
{
IServiceProvider _serviceProvider;
private JsonSerializer _serializer = new JsonSerializer();
public JSonInvocationAdapter(IServiceProvider serviceProvider)
public JSonInvocationAdapter()
{
_serviceProvider = serviceProvider;
}
public async Task<InvocationDescriptor> CreateInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
{
// TODO: use a formatter (?)
var reader = new JsonTextReader(new StreamReader(stream));
return await Task.Run(() => _serializer.Deserialize<InvocationDescriptor>(reader));
}
public Task WriteInvocationResult(Stream stream, InvocationResultDescriptor resultDescriptor)
{
var writer = new JsonTextWriter(new StreamWriter(stream));
_serializer.Serialize(writer, resultDescriptor);
writer.Flush();
Write(stream, resultDescriptor);
return Task.FromResult(0);
}
public Task InvokeClientMethod(Stream stream, InvocationDescriptor invocationDescriptor)
{
var writer = new JsonTextWriter(new StreamWriter(stream));
_serializer.Serialize(writer, invocationDescriptor);
writer.Flush();
Write(stream, invocationDescriptor);
return Task.FromResult(0);
}
private void Write(Stream stream, object value)
{
var writer = new JsonTextWriter(new StreamWriter(stream));
_serializer.Serialize(writer, value);
writer.Flush();
}
}
}

View File

@ -0,0 +1,52 @@

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
namespace SocketsSample
{
public class LineInvocationAdapter : IInvocationAdapter
{
public async Task<InvocationDescriptor> CreateInvocationDescriptor(Stream stream, Func<string, Type[]> getParams)
{
var streamReader = new StreamReader(stream);
var line = await streamReader.ReadLineAsync();
var values = line.Split(',');
var method = values[1].Substring(1);
return new InvocationDescriptor
{
Id = values[0].Substring(2),
Method = method,
Arguments = values.Skip(2).Zip(getParams(method), (v, t) => Convert.ChangeType(v, t)).ToArray()
};
}
public async Task InvokeClientMethod(Stream stream, InvocationDescriptor invocationDescriptor)
{
var msg = $"CI{invocationDescriptor.Id},M{invocationDescriptor.Method},{string.Join(",", invocationDescriptor.Arguments.Select(a => a.ToString()))}\n";
await WriteAsync(stream, msg);
}
public async Task WriteInvocationResult(Stream stream, InvocationResultDescriptor resultDescriptor)
{
if (string.IsNullOrEmpty(resultDescriptor.Error))
{
await WriteAsync(stream, $"RI{resultDescriptor.Id},E{resultDescriptor.Error}\n");
}
else
{
await WriteAsync(stream, $"RI{resultDescriptor.Id},R{(resultDescriptor.Result != null ? resultDescriptor.Result.ToString() : string.Empty)}\n");
}
}
private async Task WriteAsync(Stream stream, string msg)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(msg);
await writer.FlushAsync();
}
}
}

View File

@ -1,27 +0,0 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Newtonsoft.Json;
namespace SocketsSample
{
public class RpcJSonFormatter<T>: IFormatter<T>
{
private JsonSerializer _serializer = new JsonSerializer();
public async Task<T> ReadAsync(Stream stream)
{
var reader = new JsonTextReader(new StreamReader(stream));
return await Task.Run(() => _serializer.Deserialize<T>(reader));
}
public Task WriteAsync(T value, Stream stream)
{
var writer = new JsonTextWriter(new StreamWriter(stream));
_serializer.Serialize(writer, value);
writer.Flush();
return Task.FromResult(0);
}
}
}

View File

@ -21,10 +21,6 @@ namespace SocketsSample
services.AddSingleton<ChatEndPoint>();
services.AddSingleton<SocketFormatters>();
services.AddSingleton<InvocationDescriptorLineFormatter>();
services.AddSingleton<InvocationResultDescriptorLineFormatter>();
services.AddSingleton<RpcJSonFormatter<InvocationDescriptor>>();
services.AddSingleton<RpcJSonFormatter<InvocationResultDescriptor>>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
@ -49,13 +45,9 @@ namespace SocketsSample
app.UseFormatters(formatters=>
{
formatters.MapFormatter<InvocationDescriptor, InvocationDescriptorLineFormatter>("line");
formatters.MapFormatter<InvocationResultDescriptor, InvocationResultDescriptorLineFormatter>("line");
formatters.MapFormatter<InvocationDescriptor, RpcJSonFormatter<InvocationDescriptor>>("json");
formatters.MapFormatter<InvocationResultDescriptor, RpcJSonFormatter<InvocationResultDescriptor>>("json");
formatters.AddInvocationAdapter("protobuf", new Protobuf.ProtobufInvocationAdapter());
formatters.AddInvocationAdapter("json", new JSonInvocationAdapter(app.ApplicationServices));
formatters.AddInvocationAdapter("json", new JSonInvocationAdapter());
formatters.AddInvocationAdapter("line", new LineInvocationAdapter());
});
}
}