- separating Connection from RpcConnection
- fixing WebSockets transport to work after rewrite
- clean up
This commit is contained in:
moozzyk 2016-10-31 16:46:10 -07:00
parent 466c8d9db9
commit 752f329036
7 changed files with 136 additions and 43 deletions

View File

@ -0,0 +1,2 @@
declare type DataReceived = (data: any) => void;
declare type ErrorHandler = (e: any) => void;

View File

@ -0,0 +1,92 @@
enum ConnectionState {
Disconnected,
Connecting,
Connected
}
class Connection {
private connectionState: ConnectionState;
private url: string;
private queryString: string;
private connectionId: string;
private transport: ITransport;
private dataReceivedCallback: DataReceived;
private errorHandler: ErrorHandler;
constructor(url: string, queryString: string = "") {
this.url = url;
this.queryString = queryString;
this.connectionState = ConnectionState.Disconnected;
}
start(): Promise<void> {
if (this.connectionState != ConnectionState.Disconnected) {
throw new Error("Cannot start a connection that is not in the 'Disconnected' state");
}
return new HttpClient().get(`${this.url}/getid?${this.queryString}`)
.then(connectionId => {
this.connectionId = connectionId;
return this.tryStartTransport([
new WebSocketTransport(),
new ServerSentEventsTransport(null),
new LongPollingTransport(null)], 0);
})
.then(transport => {
this.transport = transport;
this.connectionState = ConnectionState.Connected;
})
.catch(e => {
console.log("Failed to start the connection.")
this.connectionState = ConnectionState.Disconnected;
throw e;
});
}
private tryStartTransport(transports: ITransport[], index: number): Promise<ITransport> {
let thisConnection = this;
transports[index].onDataReceived = data => thisConnection.dataReceivedCallback(data);
transports[index].onError = e => thisConnection.errorHandler(e);
return transports[index].connect(this.url, this.queryString)
.then(() => {
return transports[index];
})
.catch(e => {
index++;
if (index < transports.length) {
return this.tryStartTransport(transports, index);
}
else
{
throw new Error('No transport could be started.')
}
})
}
send(data: any): Promise<void> {
if (this.connectionState != ConnectionState.Connected) {
throw new Error("Cannot send data if the connection is not in the 'Connected' State");
}
return this.transport.send(data);
}
set dataReceived(callback: DataReceived) {
this.dataReceivedCallback = callback;
}
set onError(callback: ErrorHandler) {
this.errorHandler = callback;
}
stop(): void {
if (this.connectionState != ConnectionState.Connected) {
throw new Error("Cannot stop the connection if is not in the 'Connected' State");
}
this.transport.stop();
this.connectionState = ConnectionState.Disconnected;
}
}

View File

@ -1,5 +1,7 @@
interface ITransport {
connect(url: string, queryString: string): Promise<void>;
send(data: string): Promise<void>;
send(data: any): Promise<void>;
stop(): void;
onDataReceived: DataReceived;
onError: ErrorHandler;
}

View File

@ -51,11 +51,14 @@ class LongPollingTransport implements ITransport {
};
}
send(data: string): Promise<void> {
send(data: any): Promise<void> {
return new HttpClient().post(this.url + "/send?" + this.queryString, data);
}
stop(): void {
this.pollXhr.abort();
}
onDataReceived: DataReceived;
onError: ErrorHandler;
}

View File

@ -11,23 +11,25 @@ interface InvocationResultDescriptor {
}
class RpcConnection {
// TODO: add connection state
private url: string;
private queryString: string;
private connection: Connection;
private callbacks: Map<string, (any) => void>;
private methods: Map<string, (...args:any[]) => void>;
private transport: ITransport;
private id: number;
constructor(url: string, queryString?: string) {
this.url = url;
this.queryString = queryString || "";
this.connection = new Connection(url, queryString);
let thisRpcConnection = this;
this.connection.dataReceived = data => {
thisRpcConnection.dataReceived(data);
};
this.callbacks = new Map<string, (any) => void>();
this.methods = new Map<string, (...args:any[]) => void>();
this.id = 0;
}
private messageReceived(data: string) {
private dataReceived(data: any) {
//TODO: separate JSON parsing
var descriptor = JSON.parse(data);
if (descriptor.Method === undefined) {
@ -49,37 +51,26 @@ class RpcConnection {
}
start(): Promise<void> {
return new Promise((resolve, reject) => {
new HttpClient().get(this.url + "/getid?" + this.queryString)
.then(id => {
this.transport = new ServerSentEventsTransport(data => this.messageReceived(data));
// this.transport = new WebSocketTransport(data => this.messageReceived(data));
// this.transport = new WebSocketTransport(data => this.messageReceived(data));
// this.transport = new LongPollingTransport(data => this.messageReceived(data));
return this.transport.connect(this.url, `id=${id}&${this.queryString}`);
})
.then(() => {
resolve();
})
.catch(e => {
reject(e);
});
});
return this.connection.start();
}
stop(): void {
this.transport.stop();
return this.connection.stop();
}
invoke(methodName: string, ...args: any[]): Promise<void> {
let id = this.id;
this.id++;
let invocationDescriptor: InvocationDescriptor = {
"Id": this.id.toString(),
"Id": id.toString(),
"Method": methodName,
"Arguments": args
};
let p = new Promise<any>((resolve, reject) => {
this.callbacks[this.id] = (invocationResult: InvocationResultDescriptor) => {
this.callbacks[id] = (invocationResult: InvocationResultDescriptor) => {
if (invocationResult.Error != null) {
reject(invocationResult.Error);
}
@ -88,13 +79,14 @@ class RpcConnection {
}
};
this.transport.send(JSON.stringify(invocationDescriptor))
.catch(e => reject(e));
//TODO: separate conversion to enable different data formats
this.connection.send(JSON.stringify(invocationDescriptor))
.catch(e => {
// TODO: remove callback
reject(e);
});
});
this.id++;
//TODO: separate conversion to enable different data formats
return p;
}

View File

@ -23,11 +23,14 @@ class ServerSentEventsTransport implements ITransport {
return Promise.resolve();
}
send(data: string): Promise<void> {
send(data: any): Promise<void> {
return new HttpClient().post(this.url + "/send?" + this.queryString, data);
}
stop(): void {
this.eventSource.close();
}
onDataReceived: DataReceived;
onError: ErrorHandler;
}

View File

@ -1,12 +1,5 @@
class WebSocketTransport implements ITransport {
private webSocket: WebSocket;
// TODO: make the callback a named type
// TODO: string won't work for binary formats
private receiveCallback: (data: string) => void;
constructor(receiveCallback: (data: string) => void) {
this.receiveCallback = receiveCallback;
}
connect(url: string, queryString: string): Promise<void> {
return new Promise((resolve, reject) => {
@ -24,12 +17,15 @@ class WebSocketTransport implements ITransport {
};
this.webSocket.onmessage = (message: MessageEvent) => {
this.receiveCallback(message.data);
console.log(`(WebSockets transport) data received: ${message.data}`);
if (this.onDataReceived) {
this.onDataReceived(message.data);
}
}
});
}
send(data: string): Promise<void> {
send(data: any): Promise<void> {
this.webSocket.send(data);
return Promise.resolve();
}
@ -37,4 +33,7 @@ class WebSocketTransport implements ITransport {
stop(): void {
this.webSocket.close();
}
onDataReceived: DataReceived;
onError: ErrorHandler;
}