Utils/src/asynciter.ts

78 lines
2.0 KiB
TypeScript

import Observable from "./observable.js";
import Signal from "./signal.js";
interface IAsyncIteratorMessage<T> {
error: Error | undefined;
data: T | undefined;
close: true | undefined;
}
export default class AsyncIteratorFromCB<T> implements AsyncIterable<T> {
#onClose: (() => void)[] = [];
#onData = new Observable<IAsyncIteratorMessage<T>>();
constructor() {}
public onClose(callback: () => void) {
this.#onClose.push(callback);
}
public getCallback() {
return (error: Error | undefined, data: T | undefined) => {
this.#onData.send({ error, data, close: undefined });
};
}
public send(data: T) {
this.#onData.send({
error: undefined,
data,
close: undefined,
});
}
public close() {
this.#onData.send({
close: true,
data: undefined,
error: undefined,
});
this.#onData.close();
this.#onClose.forEach((cb) => cb());
}
[Symbol.asyncIterator](): AsyncIterator<T, undefined, any> {
const queue: IAsyncIteratorMessage<T>[] = [];
const signal = new Signal();
this.#onData.subscribe((data) => {
queue.push(data);
signal.sendSignal();
});
return {
async next() {
const send = () => {
const value = queue.shift();
if (!value) throw new Error("Error in AsyncIter");
if (value.close) {
return {
done: true,
value: undefined as any,
};
} else if (value.error) {
throw value.error;
} else {
return {
done: false,
value: value.data as T,
};
}
};
if (queue.length > 0) {
return send();
} else {
await signal.awaitSignal();
return send();
}
},
};
}
}