From 32ed7ca0c4a532c6e87d782ebd694ba2e1859f28 Mon Sep 17 00:00:00 2001 From: moozzyk Date: Fri, 30 Sep 2016 14:21:37 -0700 Subject: [PATCH] SSE kind of works - duplicate messages --- src/WebApplication95/ConnectionManager.cs | 3 +- src/WebApplication95/Dispatcher.cs | 112 ++++++++++++++-------- src/WebApplication95/LongPolling.cs | 30 +----- src/WebApplication95/ServerSentEvents.cs | 4 +- src/WebApplication95/WebSockets.cs | 1 + src/WebApplication95/wwwroot/index.html | 94 +++++++++++------- 6 files changed, 141 insertions(+), 103 deletions(-) diff --git a/src/WebApplication95/ConnectionManager.cs b/src/WebApplication95/ConnectionManager.cs index 7acf181a93..a558bc0b82 100644 --- a/src/WebApplication95/ConnectionManager.cs +++ b/src/WebApplication95/ConnectionManager.cs @@ -45,9 +45,10 @@ namespace WebApplication95 } } + + // TODO: don't leak HttpContext to ConnectionManager public string GetConnectionId(HttpContext context) { - // REVIEW: Only check the query string for longpolling var id = context.Request.Query["id"]; if (!StringValues.IsNullOrEmpty(id)) diff --git a/src/WebApplication95/Dispatcher.cs b/src/WebApplication95/Dispatcher.cs index be14340db2..ca419e94ad 100644 --- a/src/WebApplication95/Dispatcher.cs +++ b/src/WebApplication95/Dispatcher.cs @@ -41,83 +41,117 @@ namespace WebApplication95 public async Task Execute(string path, HttpContext context) where TEndPoint : EndPoint { - if (context.Request.Path.StartsWithSegments(path + "/send")) + if (context.Request.Path.StartsWithSegments(path + "/getid")) { - var connectionId = context.Request.Query["id"]; - - if (StringValues.IsNullOrEmpty(connectionId)) - { - throw new InvalidOperationException("Missing connection id"); - } - - ConnectionState state; - if (_manager.TryGetConnection(connectionId, out state)) - { - // Write the message length - await context.Request.Body.CopyToAsync(state.Connection.Input); - } + await ProcessGetId(context); + } + else if (context.Request.Path.StartsWithSegments(path + "/send")) + { + await ProcessSend(context); } else { - var endpoint = (EndPoint)context.RequestServices.GetRequiredService(); - var connectionId = _manager.GetConnectionId(context); + var endpoint = (EndPoint)context.RequestServices.GetRequiredService(); // Outgoing channels if (context.Request.Path.StartsWithSegments(path + "/sse")) { - ConnectionState state; - _manager.AddConnection(connectionId, out state); + var connectionState = GetOrCreateConnection(context); + var sse = new ServerSentEvents(connectionState); - var sse = new ServerSentEvents(state); + var ignore = endpoint.OnConnected(connectionState.Connection); - var ignore = endpoint.OnConnected(state.Connection); - - state.Connection.TransportType = TransportType.ServerSentEvents; + connectionState.Connection.TransportType = TransportType.ServerSentEvents; await sse.ProcessRequest(context); - state.Connection.Complete(); + connectionState.Connection.Complete(); - _manager.RemoveConnection(connectionId); + _manager.RemoveConnection(connectionState.Connection.ConnectionId); } else if (context.Request.Path.StartsWithSegments(path + "/ws")) { - ConnectionState state; - _manager.AddConnection(connectionId, out state); + var connectionState = GetOrCreateConnection(context); + var ws = new WebSockets(connectionState); - var ws = new WebSockets(state); + var ignore = endpoint.OnConnected(connectionState.Connection); - var ignore = endpoint.OnConnected(state.Connection); - - state.Connection.TransportType = TransportType.WebSockets; + connectionState.Connection.TransportType = TransportType.WebSockets; await ws.ProcessRequest(context); - state.Connection.Complete(); + connectionState.Connection.Complete(); - _manager.RemoveConnection(connectionId); + _manager.RemoveConnection(connectionState.Connection.ConnectionId); } else if (context.Request.Path.StartsWithSegments(path + "/poll")) { - ConnectionState state; + var connectionId = context.Request.Query["id"]; + ConnectionState connectionState; bool newConnection = false; - if (_manager.AddConnection(connectionId, out state)) + if (_manager.AddConnection(connectionId, out connectionState)) { newConnection = true; - var ignore = endpoint.OnConnected(state.Connection); + var ignore = endpoint.OnConnected(connectionState.Connection); - state.Connection.TransportType = TransportType.LongPolling; + connectionState.Connection.TransportType = TransportType.LongPolling; } - var longPolling = new LongPolling(state); + var longPolling = new LongPolling(connectionState); await longPolling.ProcessRequest(newConnection, context); - _manager.MarkConnectionDead(connectionId); + _manager.MarkConnectionDead(connectionState.Connection.ConnectionId); } - } } + + private async Task ProcessGetId(HttpContext context) + { + var connectionId = _manager.GetConnectionId(context); + ConnectionState state; + _manager.AddConnection(connectionId, out state); + context.Response.Headers["X-SignalR-ConnectionId"] = connectionId; + await context.Response.WriteAsync($"{{ \"connectionId\": \"{connectionId}\" }}"); + return; + } + + private async Task ProcessSend(HttpContext context) + { + var connectionId = context.Request.Query["id"]; + if (StringValues.IsNullOrEmpty(connectionId)) + { + throw new InvalidOperationException("Missing connection id"); + } + + ConnectionState state; + if (_manager.TryGetConnection(connectionId, out state)) + { + // Write the message length + await context.Request.Body.CopyToAsync(state.Connection.Input); + } + } + + private ConnectionState GetOrCreateConnection(HttpContext context) + { + var connectionId = context.Request.Query["id"]; + ConnectionState connectionState; + + if (StringValues.IsNullOrEmpty(connectionId)) + { + connectionId = _manager.GetConnectionId(context); + _manager.AddConnection(connectionId, out connectionState); + } + else + { + if (!_manager.TryGetConnection(connectionId, out connectionState)) + { + throw new InvalidOperationException("Unknown connection id"); + } + } + + return connectionState; + } } } diff --git a/src/WebApplication95/LongPolling.cs b/src/WebApplication95/LongPolling.cs index be1bc6c217..b47e9654ee 100644 --- a/src/WebApplication95/LongPolling.cs +++ b/src/WebApplication95/LongPolling.cs @@ -104,33 +104,9 @@ namespace WebApplication95 await Post(async state => { var data = ((ArraySegment)state); - // + 100 = laziness - var buffer = new byte[data.Count + _state.Connection.ConnectionId.Length + 100]; - var at = 0; - buffer[at++] = (byte)'{'; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)'c'; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)':'; - buffer[at++] = (byte)'"'; - int count = Encoding.UTF8.GetBytes(_state.Connection.ConnectionId, 0, _state.Connection.ConnectionId.Length, buffer, at); - at += count; - buffer[at++] = (byte)'"'; - if (data.Array != null) - { - buffer[at++] = (byte)','; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)'d'; - buffer[at++] = (byte)'"'; - buffer[at++] = (byte)':'; - //buffer[at++] = (byte)'"'; - Buffer.BlockCopy(data.Array, data.Offset, buffer, at, data.Count); - } - at += data.Count; - //buffer[at++] = (byte)'"'; - buffer[at++] = (byte)'}'; - _context.Response.ContentLength = at; - await _context.Response.Body.WriteAsync(buffer, 0, at); + _context.Response.Headers["X-SignalR-ConnectionId"] = _state.Connection.ConnectionId; + _context.Response.ContentLength = data.Count; + await _context.Response.Body.WriteAsync(data.Array, 0, data.Count); }, value); diff --git a/src/WebApplication95/ServerSentEvents.cs b/src/WebApplication95/ServerSentEvents.cs index 0aaf8287ee..d146e9ed54 100644 --- a/src/WebApplication95/ServerSentEvents.cs +++ b/src/WebApplication95/ServerSentEvents.cs @@ -40,14 +40,12 @@ namespace WebApplication95 public async Task ProcessRequest(HttpContext context) { context.Response.ContentType = "text/event-stream"; + context.Response.Headers["Cache-Control"] = "no-cache"; // End the connection if the client goes away context.RequestAborted.Register(state => OnConnectionAborted(state), this); - _context = context; - await _context.Response.WriteAsync($"data: {_state.Connection.ConnectionId}\n\n"); - // Set the initial TCS when everything is setup _initTcs.TrySetResult(null); diff --git a/src/WebApplication95/WebSockets.cs b/src/WebApplication95/WebSockets.cs index a9ab616d96..b4e57ee148 100644 --- a/src/WebApplication95/WebSockets.cs +++ b/src/WebApplication95/WebSockets.cs @@ -78,6 +78,7 @@ namespace WebApplication95 else if (result.MessageType == WebSocketMessageType.Close) { await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); + // TODO: needs to remove itself from connection mamanger? break; } } diff --git a/src/WebApplication95/wwwroot/index.html b/src/WebApplication95/wwwroot/index.html index 23d0c1e09f..20e02eaa7d 100644 --- a/src/WebApplication95/wwwroot/index.html +++ b/src/WebApplication95/wwwroot/index.html @@ -4,49 +4,77 @@

Server Sent Events

- +