export type ObserverCallbackCollect = (data: T[]) => void; export type ObserverCallback = (data: T) => void; export type ObservableInterface = { /** * Subscribe to Observable * @param {function} callback */ subscribe: (callback: ObserverCallback) => void; /** * Unsubscribe fron Observable * @param {function} callback */ unsubscribe: (callback: ObserverCallback) => void; /** * Subscribe to Observable in collect mode * @param {function} callback */ subscribeCollect: (callback: ObserverCallbackCollect) => void; }; const ClosedSymbol = Symbol("Observable Closed"); export default class Observable { private subscriber: ObserverCallback[] = []; private subscriberCollect: ObserverCallbackCollect[] = []; private events: T[] = []; private timeout: number | undefined = undefined; // Use symbol to make sure this property cannot be changed from the outside private [ClosedSymbol] = false; get closed() { return this[ClosedSymbol]; } constructor(private collect_intervall: number = 100) {} subscribe(callback: ObserverCallback) { if (this[ClosedSymbol]) throw new Error("Observable is closed!"); let oldcb = this.subscriber.find((e) => e === callback); if (!oldcb) this.subscriber.push(callback); return () => this.unsubscribe(callback); } unsubscribe(callback: ObserverCallback | ObserverCallbackCollect) { if (this[ClosedSymbol]) return; let idx = this.subscriber.findIndex((e) => e === callback); if (idx >= 0) { this.subscriber.splice(idx, 1); } else { idx = this.subscriberCollect.findIndex((e) => e === callback); if (idx >= 0) this.subscriberCollect.splice(idx, 1); } } subscribeCollect(callback: ObserverCallbackCollect) { if (this[ClosedSymbol]) throw new Error("Observable is closed!"); let oldcb = this.subscriberCollect.find((e) => e === callback); if (!oldcb) this.subscriberCollect.push(callback); } /** * Creates Public API with subscribe and unsubscribe * * @returns {object} */ getPublicApi(): ObservableInterface { if (this[ClosedSymbol]) throw new Error("Observable is closed!"); return { subscribe: (callback: ObserverCallback) => this.subscribe(callback), unsubscribe: ( callback: ObserverCallback | ObserverCallbackCollect ) => this.unsubscribe(callback), subscribeCollect: (callback: ObserverCallbackCollect) => this.subscribeCollect(callback), }; } /** * Sends data to all subscribers * @param data data to be sent */ send(data: T) { if (this[ClosedSymbol]) throw new Error("Observable is closed!"); Array.from(this.subscriber.values()).forEach((e) => { try { e(data); } catch (err) { // Catch error, so it doesn't affect other subscribers console.error(err); } }); if (this.subscribeCollect.length > 0) { this.events.push(data); if (!this.timeout) { this.timeout = setTimeout(() => { this.subscriberCollect.forEach((cb) => { cb(this.events); }); this.events = []; this.timeout = undefined; }, this.collect_intervall); } } } /** * Closes Observable. This will remove all subscribers and mark this observable as closed. * You won't be able to reopen this observable. All maybe collected data will be discardet. */ close() { this[ClosedSymbol] = true; this.subscriber = []; this.subscriberCollect = []; this.events = []; if (this.timeout) clearTimeout(this.timeout); } }