Remove unnecessary timer from obeservable

This commit is contained in:
Fabian Stamm 2020-04-13 03:11:12 +02:00
parent 2545f3a050
commit 858a508190
7 changed files with 106 additions and 98 deletions

6
.editorconfig Normal file
View File

@ -0,0 +1,6 @@
root=true
[*]
charset = utf-8
indent_size = 3
indent_style = space
insert_final_newline = true

2
package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "@hibas123/utils",
"version": "2.2.2",
"version": "2.2.3",
"lockfileVersion": 1,
"requires": true,
"dependencies": {

View File

@ -1,6 +1,6 @@
{
"name": "@hibas123/utils",
"version": "2.2.3",
"version": "2.2.4",
"description": "Different Utilities, that are not worth own packages",
"main": "lib/index.js",
"types": "lib/index.d.ts",

View File

@ -48,17 +48,21 @@ export default class AwaitStore<T = any> {
*
* @param val Value to await
*/
awaitValue(val: T): PromiseLike<void> & { catch: (cb: (err: any) => PromiseLike<void>) => PromiseLike<void>, ignore: () => void } {
awaitValue(
val: T
): PromiseLike<void> & {
catch: (cb: (err: any) => PromiseLike<void>) => PromiseLike<void>;
ignore: () => void;
} {
let ignore: () => void = () => undefined;
let prms = new Promise<void>(yes => {
let prms = new Promise<void>((yes) => {
const cb = () => {
if (this._value === val) {
yes();
this.unsubscribe(cb);
}
}
};
this.subscribe(cb);
});
@ -66,8 +70,8 @@ export default class AwaitStore<T = any> {
return {
then: prms.then.bind(prms),
catch: prms.catch.bind(prms),
ignore: () => ignore()
}
ignore: () => ignore(),
};
}
/**
@ -76,13 +80,13 @@ export default class AwaitStore<T = any> {
* @returns {object}
*/
getPublicApi() {
if (this.observable.closed)
throw new Error("Observable is closed!");
if (this.observable.closed) throw new Error("Observable is closed!");
return {
subscribe: (callback: ObserverCallback<T>) => this.subscribe(callback),
unsubscribe: (callback: ObserverCallback<T>) => this.unsubscribe(callback),
awaitValue: (value: T) => this.awaitValue(value)
}
unsubscribe: (callback: ObserverCallback<T>) =>
this.unsubscribe(callback),
awaitValue: (value: T) => this.awaitValue(value),
};
}
/**

View File

@ -1,5 +1,9 @@
import Lock, { Release } from "./lock";
import Observable, { ObserverCallback, ObserverCallbackCollect, ObservableInterface } from "./observable";
import Observable, {
ObserverCallback,
ObserverCallbackCollect,
ObservableInterface,
} from "./observable";
import AwaitStore from "./awaiter";
export {
@ -9,5 +13,5 @@ export {
ObserverCallback,
ObserverCallbackCollect,
ObservableInterface,
AwaitStore
}
AwaitStore,
};

View File

@ -32,8 +32,8 @@ export default class Lock {
return new Promise<Release>((resolve) => {
this.toCome.push(() => {
resolve({ release: this.lock() });
})
})
});
});
}
}

View File

@ -34,40 +34,34 @@ export default class Observable<T = any> {
return this[ClosedSymbol];
}
constructor(private collect_intervall: number = 100) { }
constructor(private collect_intervall: number = 100) {}
subscribe(callback: ObserverCallback<T>) {
if (this[ClosedSymbol])
throw new Error("Observable is closed!");
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
let oldcb = this.subscriber.find(e => e === callback);
if (!oldcb)
this.subscriber.push(callback)
let oldcb = this.subscriber.find((e) => e === callback);
if (!oldcb) this.subscriber.push(callback);
return () => this.unsubscribe(callback);
}
unsubscribe(callback: ObserverCallback<T> | ObserverCallbackCollect<T>) {
if (this[ClosedSymbol])
return;
if (this[ClosedSymbol]) return;
let idx = this.subscriber.findIndex(e => e === callback);
let idx = this.subscriber.findIndex((e) => e === callback);
if (idx >= 0) {
this.subscriber.splice(idx, 1);
} else {
idx = this.subscriberCollect.findIndex(e => e === callback);
if (idx >= 0)
this.subscriberCollect.splice(idx, 1);
idx = this.subscriberCollect.findIndex((e) => e === callback);
if (idx >= 0) this.subscriberCollect.splice(idx, 1);
}
}
subscribeCollect(callback: ObserverCallbackCollect<T>) {
if (this[ClosedSymbol])
throw new Error("Observable is closed!");
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
let oldcb = this.subscriberCollect.find(e => e === callback);
if (!oldcb)
this.subscriberCollect.push(callback)
let oldcb = this.subscriberCollect.find((e) => e === callback);
if (!oldcb) this.subscriberCollect.push(callback);
}
/**
@ -76,13 +70,15 @@ export default class Observable<T = any> {
* @returns {object}
*/
getPublicApi(): ObservableInterface<T> {
if (this[ClosedSymbol])
throw new Error("Observable is closed!");
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
return {
subscribe: (callback: ObserverCallback<T>) => this.subscribe(callback),
unsubscribe: (callback: ObserverCallback<T> | ObserverCallbackCollect<T>) => this.unsubscribe(callback),
subscribeCollect: (callback: ObserverCallbackCollect<T>) => this.subscribeCollect(callback)
}
unsubscribe: (
callback: ObserverCallback<T> | ObserverCallbackCollect<T>
) => this.unsubscribe(callback),
subscribeCollect: (callback: ObserverCallbackCollect<T>) =>
this.subscribeCollect(callback),
};
}
/**
@ -90,22 +86,21 @@ export default class Observable<T = any> {
* @param data data to be sent
*/
send(data: T) {
if (this[ClosedSymbol])
throw new Error("Observable is closed!")
if (this[ClosedSymbol]) throw new Error("Observable is closed!");
Array.from(this.subscriber.values()).forEach(e => {
Array.from(this.subscriber.values()).forEach((e) => {
try {
e(data)
e(data);
} catch (err) {
// Catch error, so it doesn't affect other subscribers
console.error(err)
console.error(err);
}
});
this.events.push(data);
if (!this.timeout) {
if (!this.timeout && this.subscriberCollect.length > 0) {
this.timeout = setTimeout(() => {
this.subscriberCollect.forEach(cb => {
cb(this.events)
this.subscriberCollect.forEach((cb) => {
cb(this.events);
});
this.events = [];
this.timeout = undefined;
@ -122,7 +117,6 @@ export default class Observable<T = any> {
this.subscriber = [];
this.subscriberCollect = [];
this.events = [];
if (this.timeout)
clearTimeout(this.timeout)
if (this.timeout) clearTimeout(this.timeout);
}
}