Utils/src/observable.ts

127 lines
3.9 KiB
TypeScript
Executable File

export type ObserverCallbackCollect<T> = (data: T[]) => void;
export type ObserverCallback<T> = (data: T) => void;
export type ObservableInterface<T> = {
/**
* Subscribe to Observable
* @param {function} callback
*/
subscribe: (callback: ObserverCallback<T>) => void;
/**
* Unsubscribe fron Observable
* @param {function} callback
*/
unsubscribe: (callback: ObserverCallback<T>) => void;
/**
* Subscribe to Observable in collect mode
* @param {function} callback
*/
subscribeCollect: (callback: ObserverCallbackCollect<T>) => void;
};
const ClosedSymbol = Symbol("Observable Closed");
export default class Observable<T = any> {
private subscriber: ObserverCallback<T>[] = [];
private subscriberCollect: ObserverCallbackCollect<T>[] = [];
private events: T[] = [];
private timeout: number | undefined = undefined;
// Use symbol to make sure this property cannot be changed from the outside
private [ClosedSymbol] = false;
get closed() {
return this[ClosedSymbol];
}
constructor(private collect_intervall: number = 100) {}
subscribe(callback: ObserverCallback<T>) {
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
let oldcb = this.subscriber.find((e) => e === callback);
if (!oldcb) this.subscriber.push(callback);
return () => this.unsubscribe(callback);
}
unsubscribe(callback: ObserverCallback<T> | ObserverCallbackCollect<T>) {
if (this[ClosedSymbol]) return;
let idx = this.subscriber.findIndex((e) => e === callback);
if (idx >= 0) {
this.subscriber.splice(idx, 1);
} else {
idx = this.subscriberCollect.findIndex((e) => e === callback);
if (idx >= 0) this.subscriberCollect.splice(idx, 1);
}
}
subscribeCollect(callback: ObserverCallbackCollect<T>) {
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
let oldcb = this.subscriberCollect.find((e) => e === callback);
if (!oldcb) this.subscriberCollect.push(callback);
return () => this.unsubscribe(callback);
}
/**
* Creates Public API with subscribe and unsubscribe
*
* @returns {object}
*/
getPublicApi(): ObservableInterface<T> {
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
return {
subscribe: (callback: ObserverCallback<T>) => this.subscribe(callback),
unsubscribe: (
callback: ObserverCallback<T> | ObserverCallbackCollect<T>
) => this.unsubscribe(callback),
subscribeCollect: (callback: ObserverCallbackCollect<T>) =>
this.subscribeCollect(callback),
};
}
/**
* Sends data to all subscribers
* @param data data to be sent
*/
send(data: T) {
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
Array.from(this.subscriber.values()).forEach((e) => {
try {
e(data);
} catch (err) {
// Catch error, so it doesn't affect other subscribers
console.error(err);
}
});
if (this.subscribeCollect.length > 0) {
this.events.push(data);
if (!this.timeout) {
this.timeout = setTimeout(() => {
this.subscriberCollect.forEach((cb) => {
cb(this.events);
});
this.events = [];
this.timeout = undefined;
}, this.collect_intervall);
}
}
}
/**
* Closes Observable. This will remove all subscribers and mark this observable as closed.
* You won't be able to reopen this observable. All maybe collected data will be discardet.
*/
close() {
this[ClosedSymbol] = true;
this.subscriber = [];
this.subscriberCollect = [];
this.events = [];
if (this.timeout) clearTimeout(this.timeout);
}
}