Optional error and complete functions for observables (#1000)

This commit is contained in:
BrennanConroy 2017-10-10 16:42:50 -07:00 committed by GitHub
parent 6d9dd3c1cc
commit de535422d7
3 changed files with 38 additions and 5 deletions

View File

@ -390,6 +390,32 @@ describe("HubConnection", () => {
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
expect(await observer.completed).toEqual([1, 2, 3]);
});
it("does not require error function registered", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = hubConnection.stream("testMethod").subscribe({
next: val => { }
});
// Typically this would be called by the transport
// triggers observer.error()
connection.onclose(new Error("Connection lost"));
});
it("does not require complete function registered", async () => {
let connection = new TestConnection();
let hubConnection = new HubConnection(connection);
let observer = hubConnection.stream("testMethod").subscribe({
next: val => { }
});
// Send completion to trigger observer.complete()
// Expectation is connection.receive will not to throw
connection.receive({ type: 3, invocationId: connection.lastInvocationId });
});
});
describe("onClose", () => {

View File

@ -7,7 +7,10 @@ export function asyncit(expectation: string, assertion?: () => Promise<any>, tim
testFunction = done => {
assertion()
.then(() => done())
.catch(() => fail());
.catch((err) => {
fail(err);
done();
});
};
}

View File

@ -6,8 +6,8 @@
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
error?: (err: any) => void;
complete?: () => void;
}
export interface Observable<T> {
@ -30,13 +30,17 @@ export class Subject<T> implements Observable<T> {
public error(err: any): void {
for (let observer of this.observers) {
observer.error(err);
if (observer.error) {
observer.error(err);
}
}
}
public complete(): void {
for (let observer of this.observers) {
observer.complete();
if (observer.complete) {
observer.complete();
}
}
}