Add close to observable
This commit is contained in:
@ -19,12 +19,21 @@ export type ObservableInterface<T> = {
|
||||
subscribeCollect: (callback: ObserverCallbackCollect<T>) => void;
|
||||
};
|
||||
|
||||
const ClosedSymbol = Symbol("Observable Closed");
|
||||
|
||||
export default class Observable<T = any> {
|
||||
private subscriber: ObserverCallback<T>[] = [];
|
||||
private subscriberCollect: ObserverCallbackCollect<T>[] = [];
|
||||
private events: T[] = [];
|
||||
private timeout: number | undefined = undefined;
|
||||
|
||||
// Use symbol to make sure this property cannot be changed from the outside
|
||||
private [ClosedSymbol] = false;
|
||||
|
||||
get closed() {
|
||||
return this[ClosedSymbol];
|
||||
}
|
||||
|
||||
constructor(private collect_intervall: number = 100) { }
|
||||
|
||||
/**
|
||||
@ -33,13 +42,21 @@ export default class Observable<T = any> {
|
||||
* @returns {object}
|
||||
*/
|
||||
getPublicApi(): ObservableInterface<T> {
|
||||
if (this[ClosedSymbol])
|
||||
throw new Error("Observable is closed!");
|
||||
return {
|
||||
subscribe: (callback: ObserverCallback<T>) => {
|
||||
if (this[ClosedSymbol])
|
||||
throw new Error("Observable is closed!");
|
||||
|
||||
let oldcb = this.subscriber.find(e => e === callback);
|
||||
if (!oldcb)
|
||||
this.subscriber.push(callback)
|
||||
},
|
||||
unsubscribe: (callback: ObserverCallback<T> | ObserverCallbackCollect<T>) => {
|
||||
if (this[ClosedSymbol])
|
||||
return;
|
||||
|
||||
let idx = this.subscriber.findIndex(e => e === callback);
|
||||
if (idx >= 0) {
|
||||
this.subscriber.splice(idx, 1);
|
||||
@ -50,6 +67,9 @@ export default class Observable<T = any> {
|
||||
}
|
||||
},
|
||||
subscribeCollect: (callback: ObserverCallbackCollect<T>) => {
|
||||
if (this[ClosedSymbol])
|
||||
throw new Error("Observable is closed!");
|
||||
|
||||
let oldcb = this.subscriberCollect.find(e => e === callback);
|
||||
if (!oldcb)
|
||||
this.subscriberCollect.push(callback)
|
||||
@ -62,6 +82,8 @@ export default class Observable<T = any> {
|
||||
* @param data data to be sent
|
||||
*/
|
||||
send(data: T) {
|
||||
if (this[ClosedSymbol])
|
||||
throw new Error("Observable is closed!")
|
||||
this.subscriber.forEach(e => e(data));
|
||||
this.events.push(data);
|
||||
if (!this.timeout) {
|
||||
@ -74,4 +96,17 @@ export default class Observable<T = any> {
|
||||
}, this.collect_intervall);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes Observable. This will remove all subscribers and mark this observable as closed.
|
||||
* You won't be able to reopen this observable. All maybe collected data will be discardet.
|
||||
*/
|
||||
close() {
|
||||
this[ClosedSymbol] = true;
|
||||
this.subscriber = [];
|
||||
this.subscriberCollect = [];
|
||||
this.events = [];
|
||||
if (this.timeout)
|
||||
clearTimeout(this.timeout)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user