SSE kind of works - duplicate messages

This commit is contained in:
moozzyk 2016-09-30 14:21:37 -07:00
parent 27ddb7de90
commit 32ed7ca0c4
6 changed files with 141 additions and 103 deletions

View File

@ -45,9 +45,10 @@ namespace WebApplication95
}
}
// TODO: don't leak HttpContext to ConnectionManager
public string GetConnectionId(HttpContext context)
{
// REVIEW: Only check the query string for longpolling
var id = context.Request.Query["id"];
if (!StringValues.IsNullOrEmpty(id))

View File

@ -41,83 +41,117 @@ namespace WebApplication95
public async Task Execute<TEndPoint>(string path, HttpContext context) where TEndPoint : EndPoint
{
if (context.Request.Path.StartsWithSegments(path + "/send"))
if (context.Request.Path.StartsWithSegments(path + "/getid"))
{
var connectionId = context.Request.Query["id"];
if (StringValues.IsNullOrEmpty(connectionId))
{
throw new InvalidOperationException("Missing connection id");
}
ConnectionState state;
if (_manager.TryGetConnection(connectionId, out state))
{
// Write the message length
await context.Request.Body.CopyToAsync(state.Connection.Input);
}
await ProcessGetId(context);
}
else if (context.Request.Path.StartsWithSegments(path + "/send"))
{
await ProcessSend(context);
}
else
{
var endpoint = (EndPoint)context.RequestServices.GetRequiredService<TEndPoint>();
var connectionId = _manager.GetConnectionId(context);
var endpoint = (EndPoint)context.RequestServices.GetRequiredService<TEndPoint>();
// Outgoing channels
if (context.Request.Path.StartsWithSegments(path + "/sse"))
{
ConnectionState state;
_manager.AddConnection(connectionId, out state);
var connectionState = GetOrCreateConnection(context);
var sse = new ServerSentEvents(connectionState);
var sse = new ServerSentEvents(state);
var ignore = endpoint.OnConnected(connectionState.Connection);
var ignore = endpoint.OnConnected(state.Connection);
state.Connection.TransportType = TransportType.ServerSentEvents;
connectionState.Connection.TransportType = TransportType.ServerSentEvents;
await sse.ProcessRequest(context);
state.Connection.Complete();
connectionState.Connection.Complete();
_manager.RemoveConnection(connectionId);
_manager.RemoveConnection(connectionState.Connection.ConnectionId);
}
else if (context.Request.Path.StartsWithSegments(path + "/ws"))
{
ConnectionState state;
_manager.AddConnection(connectionId, out state);
var connectionState = GetOrCreateConnection(context);
var ws = new WebSockets(connectionState);
var ws = new WebSockets(state);
var ignore = endpoint.OnConnected(connectionState.Connection);
var ignore = endpoint.OnConnected(state.Connection);
state.Connection.TransportType = TransportType.WebSockets;
connectionState.Connection.TransportType = TransportType.WebSockets;
await ws.ProcessRequest(context);
state.Connection.Complete();
connectionState.Connection.Complete();
_manager.RemoveConnection(connectionId);
_manager.RemoveConnection(connectionState.Connection.ConnectionId);
}
else if (context.Request.Path.StartsWithSegments(path + "/poll"))
{
ConnectionState state;
var connectionId = context.Request.Query["id"];
ConnectionState connectionState;
bool newConnection = false;
if (_manager.AddConnection(connectionId, out state))
if (_manager.AddConnection(connectionId, out connectionState))
{
newConnection = true;
var ignore = endpoint.OnConnected(state.Connection);
var ignore = endpoint.OnConnected(connectionState.Connection);
state.Connection.TransportType = TransportType.LongPolling;
connectionState.Connection.TransportType = TransportType.LongPolling;
}
var longPolling = new LongPolling(state);
var longPolling = new LongPolling(connectionState);
await longPolling.ProcessRequest(newConnection, context);
_manager.MarkConnectionDead(connectionId);
_manager.MarkConnectionDead(connectionState.Connection.ConnectionId);
}
}
}
private async Task ProcessGetId(HttpContext context)
{
var connectionId = _manager.GetConnectionId(context);
ConnectionState state;
_manager.AddConnection(connectionId, out state);
context.Response.Headers["X-SignalR-ConnectionId"] = connectionId;
await context.Response.WriteAsync($"{{ \"connectionId\": \"{connectionId}\" }}");
return;
}
private async Task ProcessSend(HttpContext context)
{
var connectionId = context.Request.Query["id"];
if (StringValues.IsNullOrEmpty(connectionId))
{
throw new InvalidOperationException("Missing connection id");
}
ConnectionState state;
if (_manager.TryGetConnection(connectionId, out state))
{
// Write the message length
await context.Request.Body.CopyToAsync(state.Connection.Input);
}
}
private ConnectionState GetOrCreateConnection(HttpContext context)
{
var connectionId = context.Request.Query["id"];
ConnectionState connectionState;
if (StringValues.IsNullOrEmpty(connectionId))
{
connectionId = _manager.GetConnectionId(context);
_manager.AddConnection(connectionId, out connectionState);
}
else
{
if (!_manager.TryGetConnection(connectionId, out connectionState))
{
throw new InvalidOperationException("Unknown connection id");
}
}
return connectionState;
}
}
}

View File

@ -104,33 +104,9 @@ namespace WebApplication95
await Post(async state =>
{
var data = ((ArraySegment<byte>)state);
// + 100 = laziness
var buffer = new byte[data.Count + _state.Connection.ConnectionId.Length + 100];
var at = 0;
buffer[at++] = (byte)'{';
buffer[at++] = (byte)'"';
buffer[at++] = (byte)'c';
buffer[at++] = (byte)'"';
buffer[at++] = (byte)':';
buffer[at++] = (byte)'"';
int count = Encoding.UTF8.GetBytes(_state.Connection.ConnectionId, 0, _state.Connection.ConnectionId.Length, buffer, at);
at += count;
buffer[at++] = (byte)'"';
if (data.Array != null)
{
buffer[at++] = (byte)',';
buffer[at++] = (byte)'"';
buffer[at++] = (byte)'d';
buffer[at++] = (byte)'"';
buffer[at++] = (byte)':';
//buffer[at++] = (byte)'"';
Buffer.BlockCopy(data.Array, data.Offset, buffer, at, data.Count);
}
at += data.Count;
//buffer[at++] = (byte)'"';
buffer[at++] = (byte)'}';
_context.Response.ContentLength = at;
await _context.Response.Body.WriteAsync(buffer, 0, at);
_context.Response.Headers["X-SignalR-ConnectionId"] = _state.Connection.ConnectionId;
_context.Response.ContentLength = data.Count;
await _context.Response.Body.WriteAsync(data.Array, 0, data.Count);
},
value);

View File

@ -40,14 +40,12 @@ namespace WebApplication95
public async Task ProcessRequest(HttpContext context)
{
context.Response.ContentType = "text/event-stream";
context.Response.Headers["Cache-Control"] = "no-cache";
// End the connection if the client goes away
context.RequestAborted.Register(state => OnConnectionAborted(state), this);
_context = context;
await _context.Response.WriteAsync($"data: {_state.Connection.ConnectionId}\n\n");
// Set the initial TCS when everything is setup
_initTcs.TrySetResult(null);

View File

@ -78,6 +78,7 @@ namespace WebApplication95
else if (result.MessageType == WebSocketMessageType.Close)
{
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
// TODO: needs to remove itself from connection mamanger?
break;
}
}

View File

@ -4,49 +4,77 @@
<meta charset="utf-8" />
<title></title>
<script>
var connectionId;
function send() {
var body = document.getElementById('data').value;
var xhr = new XMLHttpRequest();
var url = '/chat/send?id=' + connectionId;
xhr.open("POST", url, true);
xhr.setRequestHeader('Content-type', 'application/json');
xhr.onreadystatechange = function () {
if (xhr.readyState == 4 && xhr.status == 200) {
document.addEventListener('DOMContentLoaded', () => {
function send(connectionId, data) {
var xhr = new XMLHttpRequest();
var url = '/chat/send?id=' + connectionId;
xhr.open("POST", url, true);
xhr.setRequestHeader('Content-type', 'application/json');
xhr.onreadystatechange = function () {
if (xhr.readyState == 4 && xhr.status == 200) {
}
}
var data = JSON.stringify(data);
xhr.send(data);
}
var data = JSON.stringify(body);
xhr.send(data);
}
var source = new EventSource('/chat/sse');
function xhr(method, url) {
return new Promise((resolve, reject) => {
let xhr = new XMLHttpRequest();
xhr.open(method, url);
xhr.send();
xhr.onload = () => {
if (xhr.status >= 200 && xhr.status < 300) {
resolve(xhr.response);
} else {
reject({
status: xhr.status,
statusText: xhr.statusText
});
}
};
source.onopen = function () {
console.log('Opened!');
};
source.onerror = function (err) {
console.log('Error: ' + err.type);
};
source.onmessage = function (data) {
if (!connectionId) {
connectionId = data.data;
return;
xhr.onerror = () => {
reject({
status: xhr.status,
statusText: xhr.statusText
});
};
});
}
var child = document.createElement('li');
child.innerText = data.data;
document.getElementById('messages').appendChild(child);
};
xhr('GET', '/chat/getid').then(getidPayload => {
let connectionId = JSON.parse(getidPayload).connectionId;
let source = new EventSource(`/chat/sse?id=${connectionId}`);
source.onopen = function () {
console.log('Opened!');
};
source.onerror = function (err) {
console.log('Error: ' + err.type);
};
source.onmessage = function (data) {
var child = document.createElement('li');
child.innerText = data.data;
document.getElementById('messages').appendChild(child);
};
document.getElementById('sendmessage').addEventListener('click', () => {
let data = document.getElementById('data').value;
send(connectionId, data);
});
});
});
</script>
</head>
<body>
<h1>Server Sent Events</h1>
<input type="text" id="data" />
<input type="button" value="Send" onclick="send()" />
<input type="button" id="sendmessage" value="Send" />
<ul id="messages">