Adding error handling

Fixing SSE transport on the server
This commit is contained in:
moozzyk 2016-11-02 15:31:01 -07:00
parent 61c527f23c
commit 2bbca5e7fe
8 changed files with 144 additions and 66 deletions

View File

@ -25,22 +25,37 @@
document.getElementById('head1').innerHTML = transports ? transports.join(', ') : "auto (WebSockets)"; document.getElementById('head1').innerHTML = transports ? transports.join(', ') : "auto (WebSockets)";
let connectButton = document.getElementById('connect');
let connection = new RpcConnection(`http://${document.location.host}/hubs`, 'formatType=json&format=text'); let connection = new RpcConnection(`http://${document.location.host}/hubs`, 'formatType=json&format=text');
connection.on('Send', msg => { connection.on('Send', msg => {
addLine(msg); }); addLine(msg); });
let isConnected = false; connection.connectionClosed = e => {
if (e) {
addLine('Connection closed with error: ' + e, 'red');
}
else {
addLine('Disconnected', 'green');
}
}
let isConnected = false;
let connectButton = document.getElementById('connect');
connectButton.addEventListener('click', () => { connectButton.addEventListener('click', () => {
connection.start(transports) connection.start(transports)
.then(() => { .then(() => {
isConnected = true; isConnected = true;
addLine('Connected successfully', 'green');
}) })
.catch(err => { .catch(err => {
addLine(err, true); addLine(err, 'red');
}); });
}); });
let disconnectButton = document.getElementById('disconnect');
disconnectButton.addEventListener('click', () => {
connection.stop();
isConnected = false;
});
document.getElementById('sendmessage').addEventListener('submit', event => { document.getElementById('sendmessage').addEventListener('submit', event => {
let data = document.getElementById('data').value; let data = document.getElementById('data').value;
@ -54,7 +69,7 @@
} }
}) })
.catch(err => { .catch(err => {
addLine(err, true); addLine(err, 'red');
}); });
} }
@ -62,10 +77,10 @@
}); });
}); });
function addLine(line, isError) { function addLine(line, color) {
var child = document.createElement('li'); var child = document.createElement('li');
if (isError === true) { if (color) {
child.style.color = 'red'; child.style.color = color;
} }
child.innerText = line; child.innerText = line;
document.getElementById('messages').appendChild(child); document.getElementById('messages').appendChild(child);
@ -81,6 +96,7 @@
</select> </select>
<input type="button" id="connect" value="Connect" /> <input type="button" id="connect" value="Connect" />
<input type="button" id="disconnect" value="Disconnect" />
</div> </div>
<form id="sendmessage"> <form id="sendmessage">

View File

@ -1,2 +1,3 @@
declare type DataReceived = (data: any) => void; declare type DataReceived = (data: any) => void;
declare type ErrorHandler = (e: any) => void; declare type ErrorHandler = (e: any) => void;
declare type ConnectionClosed = (e?: any) => void;

View File

@ -11,13 +11,12 @@ class Connection {
private queryString: string; private queryString: string;
private connectionId: string; private connectionId: string;
private transport: ITransport; private transport: ITransport;
private dataReceivedCallback: DataReceived; private dataReceivedCallback: DataReceived = (data: any) => { };
private errorHandler: ErrorHandler; private connectionClosedCallback: ConnectionClosed = (error?: any) => { };
constructor(url: string, queryString: string = "") { constructor(url: string, queryString: string = "") {
this.url = url; this.url = url;
this.queryString = queryString; this.queryString = queryString;
this.connectionState = ConnectionState.Disconnected; this.connectionState = ConnectionState.Disconnected;
} }
@ -75,7 +74,7 @@ class Connection {
private tryStartTransport(transports: ITransport[], index: number): Promise<ITransport> { private tryStartTransport(transports: ITransport[], index: number): Promise<ITransport> {
let thisConnection = this; let thisConnection = this;
transports[index].onDataReceived = data => thisConnection.dataReceivedCallback(data); transports[index].onDataReceived = data => thisConnection.dataReceivedCallback(data);
transports[index].onError = e => thisConnection.errorHandler(e); transports[index].onError = e => thisConnection.stopConnection(e);
return transports[index].connect(this.url, this.queryString) return transports[index].connect(this.url, this.queryString)
.then(() => { .then(() => {
@ -102,18 +101,23 @@ class Connection {
stop(): void { stop(): void {
if (this.connectionState != ConnectionState.Connected) { if (this.connectionState != ConnectionState.Connected) {
throw new Error("Cannot stop the connection if is not in the 'Connected' State"); throw new Error("Cannot stop the connection if it is not in the 'Connected' State");
} }
this.stopConnection();
}
private stopConnection(error?: any) {
this.transport.stop(); this.transport.stop();
this.connectionState = ConnectionState.Disconnected; this.connectionState = ConnectionState.Disconnected;
this.connectionClosedCallback(error);
} }
set dataReceived(callback: DataReceived) { set dataReceived(callback: DataReceived) {
this.dataReceivedCallback = callback; this.dataReceivedCallback = callback;
} }
set onError(callback: ErrorHandler) { set connectionClosed(callback: ConnectionClosed) {
this.errorHandler = callback; this.connectionClosedCallback = callback;
} }
} }

View File

@ -6,40 +6,52 @@ class LongPollingTransport implements ITransport {
connect(url: string, queryString: string): Promise<void> { connect(url: string, queryString: string): Promise<void> {
this.url = url; this.url = url;
this.queryString = queryString; this.queryString = queryString;
this.pollXhr = new XMLHttpRequest();
// TODO: resolve promise on open sending? + reject on error
this.poll(url + "/poll?" + this.queryString) this.poll(url + "/poll?" + this.queryString)
return Promise.resolve(); return Promise.resolve();
} }
private poll(url: string): void { private poll(url: string): void {
//TODO: timeout let thisLongPollingTransport = this;
this.pollXhr.open("GET", url, true); let pollXhr = new XMLHttpRequest();
this.pollXhr.send();
this.pollXhr.onload = () => { pollXhr.onload = () => {
if (this.pollXhr.status >= 200 && this.pollXhr.status < 300) { if (pollXhr.status == 200) {
this.onDataReceived(this.pollXhr.response); if (thisLongPollingTransport.onDataReceived) {
this.poll(url); thisLongPollingTransport.onDataReceived(pollXhr.response);
}
thisLongPollingTransport.poll(url);
}
else if (this.pollXhr.status == 204) {
// TODO: closed event?
} }
else { else {
//TODO: handle error if (thisLongPollingTransport.onError) {
/* thisLongPollingTransport.onError({
{ status: pollXhr.status,
status: xhr.status, statusText: pollXhr.statusText
statusText: xhr.statusText });
}; }
}*/ }
};
this.pollXhr.onerror = () => {
/*
reject({
status: xhr.status,
statusText: xhr.statusText
});*/
//TODO: handle error
};
}; };
pollXhr.onerror = () => {
if (thisLongPollingTransport.onError) {
thisLongPollingTransport.onError({
status: pollXhr.status,
statusText: pollXhr.statusText
});
}
};
pollXhr.ontimeout = () => {
thisLongPollingTransport.poll(url);
}
this.pollXhr = pollXhr;
this.pollXhr.open("GET", url, true);
// TODO: consider making timeout configurable
this.pollXhr.timeout = 110000;
this.pollXhr.send();
} }
send(data: any): Promise<void> { send(data: any): Promise<void> {
@ -47,7 +59,10 @@ class LongPollingTransport implements ITransport {
} }
stop(): void { stop(): void {
this.pollXhr.abort(); if (this.pollXhr) {
this.pollXhr.abort();
this.pollXhr = null;
}
} }
onDataReceived: DataReceived; onDataReceived: DataReceived;

View File

@ -93,4 +93,8 @@ class RpcConnection {
on(methodName: string, method: (...args: any[]) => void) { on(methodName: string, method: (...args: any[]) => void) {
this.methods[methodName] = method; this.methods[methodName] = method;
} }
set connectionClosed(callback: ConnectionClosed) {
this.connection.connectionClosed = callback;
}
} }

View File

@ -13,21 +13,36 @@ class ServerSentEventsTransport implements ITransport {
this.queryString = queryString; this.queryString = queryString;
this.url = url; this.url = url;
let tmp = `${this.url}/sse?${this.queryString}`; let tmp = `${this.url}/sse?${this.queryString}`;
try {
this.eventSource = new EventSource(`${this.url}/sse?${this.queryString}`);
this.eventSource.onmessage = (e: MessageEvent) => { return new Promise((resolve, reject) => {
this.onDataReceived(e.data); let eventSource = new EventSource(`${this.url}/sse?${this.queryString}`);
};
this.eventSource.onerror = (e: Event) => { try {
// todo: handle errors let thisEventSourceTransport = this;
eventSource.onmessage = (e: MessageEvent) => {
if (thisEventSourceTransport.onDataReceived) {
thisEventSourceTransport.onDataReceived(e.data);
}
};
eventSource.onerror = (e: Event) => {
reject();
// don't report an error if the transport did not start successfully
if (thisEventSourceTransport.eventSource && thisEventSourceTransport.onError) {
thisEventSourceTransport.onError(e);
}
}
eventSource.onopen = () => {
thisEventSourceTransport.eventSource = eventSource;
resolve();
}
} }
} catch (e) {
catch (e) { return Promise.reject(e);
return Promise.reject(e); }
} });
return Promise.resolve();
} }
send(data: any): Promise<void> { send(data: any): Promise<void> {
@ -35,7 +50,10 @@ class ServerSentEventsTransport implements ITransport {
} }
stop(): void { stop(): void {
this.eventSource.close(); if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
} }
onDataReceived: DataReceived; onDataReceived: DataReceived;

View File

@ -5,33 +5,52 @@ class WebSocketTransport implements ITransport {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
url = url.replace(/^http/, "ws"); url = url.replace(/^http/, "ws");
let connectUrl = url + "/ws?" + queryString; let connectUrl = url + "/ws?" + queryString;
this.webSocket = new WebSocket(connectUrl);
this.webSocket.onopen = (event: Event) => { let webSocket = new WebSocket(connectUrl);
let thisWebSocketTransport = this;
webSocket.onopen = (event: Event) => {
console.log(`WebSocket connected to ${connectUrl}`); console.log(`WebSocket connected to ${connectUrl}`);
thisWebSocketTransport.webSocket = webSocket;
resolve(); resolve();
}; };
this.webSocket.onerror = (event: Event) => { webSocket.onerror = (event: Event) => {
// TODO: also handle when connection was opened successfully
reject(); reject();
}; };
this.webSocket.onmessage = (message: MessageEvent) => { webSocket.onmessage = (message: MessageEvent) => {
console.log(`(WebSockets transport) data received: ${message.data}`); console.log(`(WebSockets transport) data received: ${message.data}`);
if (this.onDataReceived) { if (thisWebSocketTransport.onDataReceived) {
this.onDataReceived(message.data); thisWebSocketTransport.onDataReceived(message.data);
}
}
webSocket.onclose = (event: CloseEvent) => {
// webSocket will be null if the transport did not start successfully
if (thisWebSocketTransport.webSocket && event.wasClean === false) {
if (thisWebSocketTransport.onError) {
thisWebSocketTransport.onError(event);
}
} }
} }
}); });
} }
send(data: any): Promise<void> { send(data: any): Promise<void> {
this.webSocket.send(data); if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) {
return Promise.resolve(); this.webSocket.send(data);
return Promise.resolve();
}
return Promise.reject("WebSocket is not in OPEN state");
} }
stop(): void { stop(): void {
this.webSocket.close(); if (this.webSocket) {
this.webSocket.close();
this.webSocket = null;
}
} }
onDataReceived: DataReceived; onDataReceived: DataReceived;

View File

@ -21,6 +21,7 @@ namespace Microsoft.AspNetCore.Sockets
context.Response.ContentType = "text/event-stream"; context.Response.ContentType = "text/event-stream";
context.Response.Headers["Cache-Control"] = "no-cache"; context.Response.Headers["Cache-Control"] = "no-cache";
context.Response.Headers["Content-Encoding"] = "identity"; context.Response.Headers["Content-Encoding"] = "identity";
await context.Response.Body.FlushAsync();
while (true) while (true)
{ {