78 lines
2.0 KiB
TypeScript
78 lines
2.0 KiB
TypeScript
import Observable from "./observable.js";
|
|
import Signal from "./signal.js";
|
|
|
|
interface IAsyncIteratorMessage<T> {
|
|
error: Error | undefined;
|
|
data: T | undefined;
|
|
close: true | undefined;
|
|
}
|
|
export default class AsyncIteratorFromCB<T> implements AsyncIterable<T> {
|
|
#onClose: (() => void)[] = [];
|
|
#onData = new Observable<IAsyncIteratorMessage<T>>();
|
|
constructor() {}
|
|
|
|
public onClose(callback: () => void) {
|
|
this.#onClose.push(callback);
|
|
}
|
|
|
|
public getCallback() {
|
|
return (error: Error | undefined, data: T | undefined) => {
|
|
this.#onData.send({ error, data, close: undefined });
|
|
};
|
|
}
|
|
|
|
public send(data: T) {
|
|
this.#onData.send({
|
|
error: undefined,
|
|
data,
|
|
close: undefined,
|
|
});
|
|
}
|
|
|
|
public close() {
|
|
this.#onData.send({
|
|
close: true,
|
|
data: undefined,
|
|
error: undefined,
|
|
});
|
|
this.#onData.close();
|
|
this.#onClose.forEach((cb) => cb());
|
|
}
|
|
|
|
[Symbol.asyncIterator](): AsyncIterator<T, undefined, any> {
|
|
const queue: IAsyncIteratorMessage<T>[] = [];
|
|
const signal = new Signal();
|
|
this.#onData.subscribe((data) => {
|
|
queue.push(data);
|
|
signal.sendSignal();
|
|
});
|
|
return {
|
|
async next() {
|
|
const send = () => {
|
|
const value = queue.shift();
|
|
if (!value) throw new Error("Error in AsyncIter");
|
|
if (value.close) {
|
|
return {
|
|
done: true,
|
|
value: undefined as any,
|
|
};
|
|
} else if (value.error) {
|
|
throw value.error;
|
|
} else {
|
|
return {
|
|
done: false,
|
|
value: value.data as T,
|
|
};
|
|
}
|
|
};
|
|
if (queue.length > 0) {
|
|
return send();
|
|
} else {
|
|
await signal.awaitSignal();
|
|
return send();
|
|
}
|
|
},
|
|
};
|
|
}
|
|
}
|