Refactoring
- Move http stuff into Http dispatcher - Exposed IChannel from connection - Clean up ConnectionManager (tho it's not great yet)
This commit is contained in:
parent
239b5f815f
commit
de291d4d2a
|
|
@ -1,52 +0,0 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Channels;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public enum TransportType
|
||||
{
|
||||
LongPolling,
|
||||
WebSockets,
|
||||
ServerSentEvents
|
||||
}
|
||||
|
||||
public class Connection : IChannel
|
||||
{
|
||||
public TransportType TransportType { get; set; }
|
||||
|
||||
public string ConnectionId { get; set; }
|
||||
|
||||
IReadableChannel IChannel.Input => Input;
|
||||
|
||||
IWritableChannel IChannel.Output => Output;
|
||||
|
||||
internal Channel Input { get; set; }
|
||||
|
||||
internal Channel Output { get; set; }
|
||||
|
||||
public Connection()
|
||||
{
|
||||
Stream = new ChannelStream(this);
|
||||
}
|
||||
|
||||
public Stream Stream { get; }
|
||||
|
||||
public void Complete()
|
||||
{
|
||||
Input.CompleteReader();
|
||||
Input.CompleteWriter();
|
||||
|
||||
Output.CompleteReader();
|
||||
Output.CompleteWriter();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,19 +1,13 @@
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Channels;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public class ConnectionManager
|
||||
{
|
||||
private ConcurrentDictionary<string, ConnectionState> _connections = new ConcurrentDictionary<string, ConnectionState>();
|
||||
private readonly ChannelFactory _channelFactory = new ChannelFactory();
|
||||
private Timer _timer;
|
||||
|
||||
public ConnectionManager()
|
||||
|
|
@ -21,6 +15,72 @@ namespace WebApplication95
|
|||
_timer = new Timer(Scan, this, 0, 1000);
|
||||
}
|
||||
|
||||
public bool TryGetConnection(string id, out ConnectionState state)
|
||||
{
|
||||
return _connections.TryGetValue(id, out state);
|
||||
}
|
||||
|
||||
public ConnectionState ReserveConnection()
|
||||
{
|
||||
string id = MakeNewConnectionId();
|
||||
|
||||
// REVIEW: Should we create state for this?
|
||||
var state = _connections.GetOrAdd(id, connectionId => new ConnectionState());
|
||||
|
||||
// Mark it as a reservation
|
||||
state.Connection = new Connection
|
||||
{
|
||||
ConnectionId = id
|
||||
};
|
||||
return state;
|
||||
}
|
||||
|
||||
public ConnectionState AddNewConnection(IChannel channel)
|
||||
{
|
||||
string id = MakeNewConnectionId();
|
||||
|
||||
var state = _connections.GetOrAdd(id, connectionId => new ConnectionState());
|
||||
|
||||
// If there's no connection object then it's a new connection
|
||||
if (state.Connection == null)
|
||||
{
|
||||
state.Connection = new Connection
|
||||
{
|
||||
Channel = channel,
|
||||
ConnectionId = id
|
||||
};
|
||||
}
|
||||
|
||||
// Update the last seen and mark the connection as active
|
||||
state.LastSeen = DateTimeOffset.UtcNow;
|
||||
state.Active = true;
|
||||
return state;
|
||||
}
|
||||
|
||||
public void MarkConnectionInactive(string id)
|
||||
{
|
||||
ConnectionState state;
|
||||
if (_connections.TryGetValue(id, out state))
|
||||
{
|
||||
// Mark the connection as active so the background thread can look at it
|
||||
state.Active = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveConnection(string id)
|
||||
{
|
||||
ConnectionState state;
|
||||
_connections.TryRemove(id, out state);
|
||||
|
||||
// Remove the connection completely
|
||||
}
|
||||
|
||||
private static string MakeNewConnectionId()
|
||||
{
|
||||
// TODO: We need to sign and encyrpt this
|
||||
return Guid.NewGuid().ToString();
|
||||
}
|
||||
|
||||
private static void Scan(object state)
|
||||
{
|
||||
((ConnectionManager)state).Scan();
|
||||
|
|
@ -28,14 +88,15 @@ namespace WebApplication95
|
|||
|
||||
private void Scan()
|
||||
{
|
||||
// Scan the registered connections looking for ones that have timed out
|
||||
foreach (var c in _connections)
|
||||
{
|
||||
if (!c.Value.Alive && (DateTimeOffset.UtcNow - c.Value.LastSeen).TotalSeconds > 30)
|
||||
if (!c.Value.Active && (DateTimeOffset.UtcNow - c.Value.LastSeen).TotalSeconds > 30)
|
||||
{
|
||||
ConnectionState s;
|
||||
if (_connections.TryRemove(c.Key, out s))
|
||||
{
|
||||
s.Connection.Complete();
|
||||
s.Connection.Channel.Dispose();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -44,60 +105,5 @@ namespace WebApplication95
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TODO: don't leak HttpContext to ConnectionManager
|
||||
public string GetConnectionId(HttpContext context)
|
||||
{
|
||||
var id = context.Request.Query["id"];
|
||||
|
||||
if (!StringValues.IsNullOrEmpty(id))
|
||||
{
|
||||
return id.ToString();
|
||||
}
|
||||
|
||||
return Guid.NewGuid().ToString();
|
||||
}
|
||||
|
||||
public bool TryGetConnection(string id, out ConnectionState state)
|
||||
{
|
||||
return _connections.TryGetValue(id, out state);
|
||||
}
|
||||
|
||||
public bool AddConnection(string id, out ConnectionState state)
|
||||
{
|
||||
state = _connections.GetOrAdd(id, connectionId => new ConnectionState());
|
||||
var isNew = state.Connection == null;
|
||||
if (isNew)
|
||||
{
|
||||
state.Connection = new Connection
|
||||
{
|
||||
ConnectionId = id,
|
||||
Input = _channelFactory.CreateChannel(),
|
||||
Output = _channelFactory.CreateChannel()
|
||||
};
|
||||
}
|
||||
state.LastSeen = DateTimeOffset.UtcNow;
|
||||
state.Alive = true;
|
||||
return isNew;
|
||||
}
|
||||
|
||||
public void MarkConnectionDead(string id)
|
||||
{
|
||||
ConnectionState state;
|
||||
if (_connections.TryGetValue(id, out state))
|
||||
{
|
||||
state.Alive = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveConnection(string id)
|
||||
{
|
||||
ConnectionState state;
|
||||
if (_connections.TryRemove(id, out state))
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,18 @@
|
|||
using System;
|
||||
using Channels;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public class ConnectionState
|
||||
{
|
||||
public DateTimeOffset LastSeen { get; set; }
|
||||
public bool Alive { get; set; } = true;
|
||||
public bool Active { get; set; } = true;
|
||||
public Connection Connection { get; set; }
|
||||
public bool IsReservation { get; set; }
|
||||
}
|
||||
|
||||
public class Connection
|
||||
{
|
||||
public string ConnectionId { get; set; }
|
||||
public IChannel Channel { get; set; }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,160 +0,0 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Channels;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
using WebApplication95.Routing;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public static class DispatcherExtensions
|
||||
{
|
||||
public static IApplicationBuilder UseRealTimeConnections(this IApplicationBuilder app, Action<Dispatcher> callback)
|
||||
{
|
||||
var dispatcher = new Dispatcher(app);
|
||||
callback(dispatcher);
|
||||
app.UseRouter(dispatcher.GetRouter());
|
||||
return app;
|
||||
}
|
||||
}
|
||||
|
||||
public class Dispatcher
|
||||
{
|
||||
private readonly ConnectionManager _manager = new ConnectionManager();
|
||||
private readonly RouteBuilder _routes;
|
||||
|
||||
public Dispatcher(IApplicationBuilder app)
|
||||
{
|
||||
_routes = new RouteBuilder(app);
|
||||
}
|
||||
|
||||
public void MapEndPoint<TEndPoint>(string path) where TEndPoint : EndPoint
|
||||
{
|
||||
_routes.AddPrefixRoute(path, new RouteHandler(c => Execute<TEndPoint>(path, c)));
|
||||
}
|
||||
|
||||
public IRouter GetRouter() => _routes.Build();
|
||||
|
||||
public async Task Execute<TEndPoint>(string path, HttpContext context) where TEndPoint : EndPoint
|
||||
{
|
||||
if (context.Request.Path.StartsWithSegments(path + "/getid"))
|
||||
{
|
||||
await ProcessGetId(context);
|
||||
}
|
||||
else if (context.Request.Path.StartsWithSegments(path + "/send"))
|
||||
{
|
||||
await ProcessSend(context);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
var endpoint = (EndPoint)context.RequestServices.GetRequiredService<TEndPoint>();
|
||||
|
||||
// Outgoing channels
|
||||
if (context.Request.Path.StartsWithSegments(path + "/sse"))
|
||||
{
|
||||
var connectionState = GetOrCreateConnection(context);
|
||||
connectionState.IsReservation = false;
|
||||
var sse = new ServerSentEvents(connectionState);
|
||||
|
||||
var ignore = endpoint.OnConnected(connectionState.Connection);
|
||||
|
||||
connectionState.Connection.TransportType = TransportType.ServerSentEvents;
|
||||
|
||||
await sse.ProcessRequest(context);
|
||||
|
||||
connectionState.Connection.Complete();
|
||||
|
||||
_manager.RemoveConnection(connectionState.Connection.ConnectionId);
|
||||
}
|
||||
else if (context.Request.Path.StartsWithSegments(path + "/ws"))
|
||||
{
|
||||
var connectionState = GetOrCreateConnection(context);
|
||||
connectionState.IsReservation = false;
|
||||
|
||||
var ws = new WebSockets(connectionState);
|
||||
|
||||
var ignore = endpoint.OnConnected(connectionState.Connection);
|
||||
|
||||
connectionState.Connection.TransportType = TransportType.WebSockets;
|
||||
|
||||
await ws.ProcessRequest(context);
|
||||
|
||||
connectionState.Connection.Complete();
|
||||
|
||||
_manager.RemoveConnection(connectionState.Connection.ConnectionId);
|
||||
}
|
||||
else if (context.Request.Path.StartsWithSegments(path + "/poll"))
|
||||
{
|
||||
var connectionId = context.Request.Query["id"];
|
||||
ConnectionState connectionState;
|
||||
|
||||
if (_manager.AddConnection(connectionId, out connectionState) || connectionState.IsReservation)
|
||||
{
|
||||
connectionState.Connection.TransportType = TransportType.LongPolling;
|
||||
connectionState.IsReservation = false;
|
||||
var ignore = endpoint.OnConnected(connectionState.Connection);
|
||||
}
|
||||
|
||||
var longPolling = new LongPolling(connectionState);
|
||||
|
||||
await longPolling.ProcessRequest(context);
|
||||
|
||||
_manager.MarkConnectionDead(connectionState.Connection.ConnectionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessGetId(HttpContext context)
|
||||
{
|
||||
var connectionId = _manager.GetConnectionId(context);
|
||||
ConnectionState state;
|
||||
_manager.AddConnection(connectionId, out state);
|
||||
state.IsReservation = true;
|
||||
context.Response.Headers["X-SignalR-ConnectionId"] = connectionId;
|
||||
await context.Response.WriteAsync($"{{ \"connectionId\": \"{connectionId}\" }}");
|
||||
return;
|
||||
}
|
||||
|
||||
private async Task ProcessSend(HttpContext context)
|
||||
{
|
||||
var connectionId = context.Request.Query["id"];
|
||||
if (StringValues.IsNullOrEmpty(connectionId))
|
||||
{
|
||||
throw new InvalidOperationException("Missing connection id");
|
||||
}
|
||||
|
||||
ConnectionState state;
|
||||
if (_manager.TryGetConnection(connectionId, out state))
|
||||
{
|
||||
// Write the message length
|
||||
await context.Request.Body.CopyToAsync(state.Connection.Input);
|
||||
}
|
||||
}
|
||||
|
||||
private ConnectionState GetOrCreateConnection(HttpContext context)
|
||||
{
|
||||
var connectionId = context.Request.Query["id"];
|
||||
ConnectionState connectionState;
|
||||
|
||||
if (StringValues.IsNullOrEmpty(connectionId))
|
||||
{
|
||||
connectionId = _manager.GetConnectionId(context);
|
||||
_manager.AddConnection(connectionId, out connectionState);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!_manager.TryGetConnection(connectionId, out connectionState))
|
||||
{
|
||||
throw new InvalidOperationException("Unknown connection id");
|
||||
}
|
||||
}
|
||||
|
||||
return connectionState;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,10 +16,10 @@ namespace WebApplication95.EndPoints
|
|||
{
|
||||
while (true)
|
||||
{
|
||||
var input = await connection.Input.ReadAsync();
|
||||
var input = await connection.Channel.Input.ReadAsync();
|
||||
try
|
||||
{
|
||||
if (input.IsEmpty && connection.Input.Reading.IsCompleted)
|
||||
if (input.IsEmpty && connection.Channel.Input.Reading.IsCompleted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
|
@ -31,17 +31,17 @@ namespace WebApplication95.EndPoints
|
|||
}
|
||||
finally
|
||||
{
|
||||
connection.Input.Advance(input.End);
|
||||
connection.Channel.Input.Advance(input.End);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
connection.Input.CompleteReader();
|
||||
connection.Channel.Input.Complete();
|
||||
}
|
||||
|
||||
private async Task OnMessage(Message message, Connection connection)
|
||||
{
|
||||
var buffer = connection.Output.Alloc();
|
||||
var buffer = connection.Channel.Output.Alloc();
|
||||
var payload = message.Payload;
|
||||
buffer.Append(ref payload);
|
||||
await buffer.FlushAsync();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
using System;
|
||||
using Channels;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public class HttpChannel : IChannel
|
||||
{
|
||||
public HttpChannel(ChannelFactory factory)
|
||||
{
|
||||
Input = factory.CreateChannel();
|
||||
Output = factory.CreateChannel();
|
||||
}
|
||||
|
||||
IReadableChannel IChannel.Input => Input;
|
||||
|
||||
IWritableChannel IChannel.Output => Output;
|
||||
|
||||
public Channel Input { get; }
|
||||
|
||||
public Channel Output { get; }
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Input.CompleteReader();
|
||||
Input.CompleteWriter();
|
||||
|
||||
Output.CompleteReader();
|
||||
Output.CompleteWriter();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,187 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Channels;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
using WebApplication95.Routing;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public class HttpConnectionDispatcher
|
||||
{
|
||||
private readonly ConnectionManager _manager = new ConnectionManager();
|
||||
private readonly ChannelFactory _channelFactory = new ChannelFactory();
|
||||
private readonly RouteBuilder _routes;
|
||||
|
||||
public HttpConnectionDispatcher(IApplicationBuilder app)
|
||||
{
|
||||
_routes = new RouteBuilder(app);
|
||||
}
|
||||
|
||||
public void MapSocketEndpoint<TEndPoint>(string path) where TEndPoint : EndPoint
|
||||
{
|
||||
_routes.AddPrefixRoute(path, new RouteHandler(c => Execute<TEndPoint>(path, c)));
|
||||
}
|
||||
|
||||
public IRouter GetRouter() => _routes.Build();
|
||||
|
||||
public async Task Execute<TEndPoint>(string path, HttpContext context) where TEndPoint : EndPoint
|
||||
{
|
||||
if (context.Request.Path.StartsWithSegments(path + "/getid"))
|
||||
{
|
||||
await ProcessGetId(context);
|
||||
}
|
||||
else if (context.Request.Path.StartsWithSegments(path + "/send"))
|
||||
{
|
||||
await ProcessSend(context);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Get the end point mapped to this http connection
|
||||
var endpoint = (EndPoint)context.RequestServices.GetRequiredService<TEndPoint>();
|
||||
|
||||
// Server sent events transport
|
||||
if (context.Request.Path.StartsWithSegments(path + "/sse"))
|
||||
{
|
||||
var connectionState = GetOrCreateConnection(context);
|
||||
var sse = new ServerSentEvents((HttpChannel)connectionState.Connection.Channel);
|
||||
|
||||
var ignore = endpoint.OnConnected(connectionState.Connection);
|
||||
|
||||
await sse.ProcessRequest(context);
|
||||
|
||||
connectionState.Connection.Channel.Dispose();
|
||||
|
||||
_manager.RemoveConnection(connectionState.Connection.ConnectionId);
|
||||
}
|
||||
else if (context.Request.Path.StartsWithSegments(path + "/ws"))
|
||||
{
|
||||
var connectionState = GetOrCreateConnection(context);
|
||||
var ws = new WebSockets((HttpChannel)connectionState.Connection.Channel);
|
||||
|
||||
var ignore = endpoint.OnConnected(connectionState.Connection);
|
||||
|
||||
await ws.ProcessRequest(context);
|
||||
|
||||
connectionState.Connection.Channel.Dispose();
|
||||
|
||||
_manager.RemoveConnection(connectionState.Connection.ConnectionId);
|
||||
}
|
||||
else if (context.Request.Path.StartsWithSegments(path + "/poll"))
|
||||
{
|
||||
var connectionId = context.Request.Query["id"];
|
||||
ConnectionState connectionState;
|
||||
|
||||
bool isNewConnection = false;
|
||||
if (_manager.TryGetConnection(connectionId, out connectionState))
|
||||
{
|
||||
// Treat reserved connections like new ones
|
||||
if (connectionState.Connection.Channel == null)
|
||||
{
|
||||
var channel = new HttpChannel(_channelFactory);
|
||||
|
||||
// REVIEW: The connection manager should encapsulate this...
|
||||
connectionState.Active = true;
|
||||
connectionState.LastSeen = DateTimeOffset.UtcNow;
|
||||
connectionState.Connection.Channel = channel;
|
||||
isNewConnection = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Add new connection
|
||||
connectionState = _manager.AddNewConnection(new HttpChannel(_channelFactory));
|
||||
isNewConnection = true;
|
||||
}
|
||||
|
||||
// Raise OnConnected for new connections only since polls happen all the time
|
||||
if (isNewConnection)
|
||||
{
|
||||
var ignore = endpoint.OnConnected(connectionState.Connection);
|
||||
}
|
||||
|
||||
var longPolling = new LongPolling((HttpChannel)connectionState.Connection.Channel);
|
||||
|
||||
await longPolling.ProcessRequest(context);
|
||||
|
||||
_manager.MarkConnectionInactive(connectionState.Connection.ConnectionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Task ProcessGetId(HttpContext context)
|
||||
{
|
||||
// Reserve an id for this connection
|
||||
var state = _manager.ReserveConnection();
|
||||
|
||||
// Get the bytes for the connection id
|
||||
var connectionIdBuffer = Encoding.UTF8.GetBytes(state.Connection.ConnectionId);
|
||||
|
||||
// Write it out to the response with the right content length
|
||||
context.Response.ContentLength = connectionIdBuffer.Length;
|
||||
return context.Response.Body.WriteAsync(connectionIdBuffer, 0, connectionIdBuffer.Length);
|
||||
}
|
||||
|
||||
private Task ProcessSend(HttpContext context)
|
||||
{
|
||||
var connectionId = context.Request.Query["id"];
|
||||
if (StringValues.IsNullOrEmpty(connectionId))
|
||||
{
|
||||
throw new InvalidOperationException("Missing connection id");
|
||||
}
|
||||
|
||||
ConnectionState state;
|
||||
if (_manager.TryGetConnection(connectionId, out state))
|
||||
{
|
||||
// If we received an HTTP POST for the connection id and it's not an HttpChannel then fail.
|
||||
// You can't write to a TCP channel directly from here.
|
||||
var httpChannel = state.Connection.Channel as HttpChannel;
|
||||
|
||||
if (httpChannel == null)
|
||||
{
|
||||
throw new InvalidOperationException("No channel");
|
||||
}
|
||||
|
||||
return context.Request.Body.CopyToAsync(httpChannel.Input);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private ConnectionState GetOrCreateConnection(HttpContext context)
|
||||
{
|
||||
var connectionId = context.Request.Query["id"];
|
||||
ConnectionState connectionState;
|
||||
|
||||
// There's no connection id so this is a branch new connection
|
||||
if (StringValues.IsNullOrEmpty(connectionId))
|
||||
{
|
||||
var channel = new HttpChannel(_channelFactory);
|
||||
connectionState = _manager.AddNewConnection(channel);
|
||||
}
|
||||
else
|
||||
{
|
||||
// REVIEW: Fail if not reserved? Reused an existing connection id?
|
||||
|
||||
// There's a connection id
|
||||
if (!_manager.TryGetConnection(connectionId, out connectionState))
|
||||
{
|
||||
throw new InvalidOperationException("Unknown connection id");
|
||||
}
|
||||
|
||||
// Reserved connection, we need to provide a channel
|
||||
if (connectionState.Connection.Channel == null)
|
||||
{
|
||||
connectionState.Connection.Channel = new HttpChannel(_channelFactory);
|
||||
}
|
||||
}
|
||||
|
||||
return connectionState;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
|
||||
namespace WebApplication95
|
||||
{
|
||||
public static class HttpDispatcherAppBuilderExtensions
|
||||
{
|
||||
public static IApplicationBuilder UseSockets(this IApplicationBuilder app, Action<HttpConnectionDispatcher> callback)
|
||||
{
|
||||
var dispatcher = new HttpConnectionDispatcher(app);
|
||||
callback(dispatcher);
|
||||
app.UseRouter(dispatcher.GetRouter());
|
||||
return app;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -14,12 +14,12 @@ namespace WebApplication95
|
|||
private TaskCompletionSource<object> _initTcs = new TaskCompletionSource<object>();
|
||||
private TaskCompletionSource<object> _lifetime = new TaskCompletionSource<object>();
|
||||
private HttpContext _context;
|
||||
private readonly ConnectionState _state;
|
||||
private readonly HttpChannel _channel;
|
||||
|
||||
public LongPolling(ConnectionState state)
|
||||
public LongPolling(HttpChannel channel)
|
||||
{
|
||||
_lastTask = _initTcs.Task;
|
||||
_state = state;
|
||||
_channel = channel;
|
||||
}
|
||||
|
||||
private Task Post(Func<object, Task> work, object state)
|
||||
|
|
@ -56,21 +56,25 @@ namespace WebApplication95
|
|||
|
||||
private async Task ProcessMessages(HttpContext context)
|
||||
{
|
||||
var buffer = await _state.Connection.Output.ReadAsync();
|
||||
var buffer = await _channel.Output.ReadAsync();
|
||||
|
||||
foreach (var memory in buffer)
|
||||
if (buffer.IsEmpty && _channel.Output.Reading.IsCompleted)
|
||||
{
|
||||
ArraySegment<byte> data;
|
||||
if (memory.TryGetArray(out data))
|
||||
{
|
||||
await Send(data);
|
||||
|
||||
// Advance the buffer one block of memory
|
||||
buffer = buffer.Slice(memory.Length);
|
||||
_state.Connection.Output.Advance(buffer.Start);
|
||||
break;
|
||||
}
|
||||
CompleteRequest();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await Send(buffer);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_channel.Output.Advance(buffer.End);
|
||||
}
|
||||
|
||||
|
||||
CompleteRequest();
|
||||
}
|
||||
|
||||
private static void OnConnectionAborted(object state)
|
||||
|
|
@ -88,18 +92,22 @@ namespace WebApplication95
|
|||
_lifetime);
|
||||
}
|
||||
|
||||
public async Task Send(ArraySegment<byte> value)
|
||||
public Task Send(ReadableBuffer value)
|
||||
{
|
||||
await Post(async state =>
|
||||
return Post(async state =>
|
||||
{
|
||||
var data = ((ArraySegment<byte>)state);
|
||||
_context.Response.Headers["X-SignalR-ConnectionId"] = _state.Connection.ConnectionId;
|
||||
_context.Response.ContentLength = data.Count;
|
||||
await _context.Response.Body.WriteAsync(data.Array, data.Offset, data.Count);
|
||||
var data = ((ReadableBuffer)state);
|
||||
_context.Response.ContentLength = data.Length;
|
||||
foreach (var memory in data)
|
||||
{
|
||||
ArraySegment<byte> segment;
|
||||
if (memory.TryGetArray(out segment))
|
||||
{
|
||||
await _context.Response.Body.WriteAsync(segment.Array, segment.Offset, segment.Count);
|
||||
}
|
||||
}
|
||||
},
|
||||
value);
|
||||
|
||||
CompleteRequest();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,11 +13,11 @@ namespace WebApplication95
|
|||
private TaskCompletionSource<object> _initTcs = new TaskCompletionSource<object>();
|
||||
private TaskCompletionSource<object> _lifetime = new TaskCompletionSource<object>();
|
||||
private HttpContext _context;
|
||||
private readonly ConnectionState _state;
|
||||
private readonly HttpChannel _channel;
|
||||
|
||||
public ServerSentEvents(ConnectionState state)
|
||||
public ServerSentEvents(HttpChannel channel)
|
||||
{
|
||||
_state = state;
|
||||
_channel = channel;
|
||||
_lastTask = _initTcs.Task;
|
||||
var ignore = StartSending();
|
||||
}
|
||||
|
|
@ -75,36 +75,29 @@ namespace WebApplication95
|
|||
|
||||
while (true)
|
||||
{
|
||||
var buffer = await _state.Connection.Output.ReadAsync();
|
||||
var buffer = await _channel.Output.ReadAsync();
|
||||
|
||||
if (buffer.IsEmpty && _state.Connection.Output.Reading.IsCompleted)
|
||||
if (buffer.IsEmpty && _channel.Output.Reading.IsCompleted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
foreach (var memory in buffer)
|
||||
{
|
||||
ArraySegment<byte> data;
|
||||
if (memory.TryGetArray(out data))
|
||||
{
|
||||
await Send(data);
|
||||
}
|
||||
}
|
||||
await Send(buffer);
|
||||
|
||||
_state.Connection.Output.Advance(buffer.End);
|
||||
_channel.Output.Advance(buffer.End);
|
||||
}
|
||||
|
||||
_state.Connection.Output.CompleteReader();
|
||||
_channel.Output.CompleteReader();
|
||||
}
|
||||
|
||||
private Task Send(ArraySegment<byte> value)
|
||||
private Task Send(ReadableBuffer value)
|
||||
{
|
||||
return Post(async state =>
|
||||
{
|
||||
var data = ((ArraySegment<byte>)state);
|
||||
var data = ((ReadableBuffer)state);
|
||||
// TODO: Pooled buffers
|
||||
// 8 = 6(data: ) + 2 (\n\n)
|
||||
var buffer = new byte[8 + data.Count];
|
||||
var buffer = new byte[8 + data.Length];
|
||||
var at = 0;
|
||||
buffer[at++] = (byte)'d';
|
||||
buffer[at++] = (byte)'a';
|
||||
|
|
@ -112,8 +105,8 @@ namespace WebApplication95
|
|||
buffer[at++] = (byte)'a';
|
||||
buffer[at++] = (byte)':';
|
||||
buffer[at++] = (byte)' ';
|
||||
Buffer.BlockCopy(data.Array, data.Offset, buffer, at, data.Count);
|
||||
at += data.Count;
|
||||
data.CopyTo(new Span<byte>(buffer, at, buffer.Length - at));
|
||||
at += data.Length;
|
||||
buffer[at++] = (byte)'\n';
|
||||
buffer[at++] = (byte)'\n';
|
||||
await _context.Response.Body.WriteAsync(buffer, 0, at);
|
||||
|
|
|
|||
|
|
@ -31,9 +31,9 @@ namespace WebApplication95
|
|||
app.UseDeveloperExceptionPage();
|
||||
}
|
||||
|
||||
app.UseRealTimeConnections(d =>
|
||||
app.UseSockets(d =>
|
||||
{
|
||||
d.MapEndPoint<ChatEndPoint>("/chat");
|
||||
d.MapSocketEndpoint<ChatEndPoint>("/chat");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,12 +10,12 @@ namespace WebApplication95
|
|||
public class WebSockets
|
||||
{
|
||||
private WebSocket _ws;
|
||||
private ConnectionState _state;
|
||||
private HttpChannel _channel;
|
||||
private TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
|
||||
|
||||
public WebSockets(ConnectionState state)
|
||||
public WebSockets(HttpChannel channel)
|
||||
{
|
||||
_state = state;
|
||||
_channel = channel;
|
||||
var ignore = StartSending();
|
||||
}
|
||||
|
||||
|
|
@ -25,9 +25,9 @@ namespace WebApplication95
|
|||
|
||||
while (true)
|
||||
{
|
||||
var buffer = await _state.Connection.Output.ReadAsync();
|
||||
var buffer = await _channel.Output.ReadAsync();
|
||||
|
||||
if (buffer.IsEmpty && _state.Connection.Output.Reading.IsCompleted)
|
||||
if (buffer.IsEmpty && _channel.Output.Reading.IsCompleted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
|
@ -41,10 +41,10 @@ namespace WebApplication95
|
|||
}
|
||||
}
|
||||
|
||||
_state.Connection.Output.Advance(buffer.End);
|
||||
_channel.Output.Advance(buffer.End);
|
||||
}
|
||||
|
||||
_state.Connection.Output.CompleteReader();
|
||||
_channel.Output.CompleteReader();
|
||||
}
|
||||
|
||||
public async Task ProcessRequest(HttpContext context)
|
||||
|
|
@ -69,11 +69,11 @@ namespace WebApplication95
|
|||
// TODO: Fragments
|
||||
if (result.MessageType == WebSocketMessageType.Text)
|
||||
{
|
||||
await _state.Connection.Input.WriteAsync(new Span<byte>(buffer, 0, result.Count));
|
||||
await _channel.Input.WriteAsync(new Span<byte>(buffer, 0, result.Count));
|
||||
}
|
||||
else if (result.MessageType == WebSocketMessageType.Binary)
|
||||
{
|
||||
await _state.Connection.Input.WriteAsync(new Span<byte>(buffer, 0, result.Count));
|
||||
await _channel.Input.WriteAsync(new Span<byte>(buffer, 0, result.Count));
|
||||
}
|
||||
else if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
document.addEventListener('DOMContentLoaded', () => {
|
||||
function send(connectionId, data) {
|
||||
var xhr = new XMLHttpRequest();
|
||||
var url = '/chat/send?id=' + connectionId;
|
||||
var url = `/chat/send?id=${connectionId}`;
|
||||
xhr.open("POST", url, true);
|
||||
xhr.onreadystatechange = function () {
|
||||
if (xhr.readyState == 4 && xhr.status == 200) {
|
||||
|
|
@ -41,9 +41,7 @@
|
|||
});
|
||||
}
|
||||
|
||||
xhr('GET', '/chat/getid').then(getidPayload => {
|
||||
let connectionId = JSON.parse(getidPayload).connectionId;
|
||||
|
||||
xhr('GET', '/chat/getid').then(connectionId => {
|
||||
let source = new EventSource(`/chat/sse?id=${connectionId}`);
|
||||
|
||||
source.onopen = function () {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@
|
|||
|
||||
function send(connectionId, data) {
|
||||
var xhr = new XMLHttpRequest();
|
||||
var url = '/chat/send?id=' + connectionId;
|
||||
var url = `/chat/send?id=${connectionId}`;
|
||||
xhr.open("POST", url, true);
|
||||
xhr.onreadystatechange = function () {
|
||||
if (xhr.readyState == 4 && xhr.status == 200) {
|
||||
|
|
@ -56,9 +56,7 @@
|
|||
xhr.send(null);
|
||||
}
|
||||
|
||||
xhr('GET', '/chat/getid').then(getidPayload => {
|
||||
let connectionId = JSON.parse(getidPayload).connectionId;
|
||||
|
||||
xhr('GET', '/chat/getid').then(connectionId => {
|
||||
document.getElementById('sendmessage').addEventListener('click', () => {
|
||||
let data = document.getElementById('data').value;
|
||||
send(connectionId, data);
|
||||
|
|
|
|||
Loading…
Reference in New Issue