import { Database, Change, ChangeTypes } from "./database"; import { resNull } from "../storage"; import nanoid = require("nanoid/generate"); import Logging from "@hibas123/nodelogging"; import * as MSGPack from "what-the-pack"; export const MP = MSGPack.initialize(2 ** 20); const ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; const { encode, decode } = MP; interface ISubscribeOptions { existing: boolean; } export class Query { /** * Returns true if the path only contains valid characters and false if it doesn't * @param path Path to be checked */ private validatePath(path: string[]) { return path.every(e => (e.match(/[^a-zA-Z0-9_\-\<\>]/g) || []).length === 0); } constructor(protected database: Database, protected path: string[], protected sender: string) { if (path.length > 10) { throw new Error("Path is to long. Path is only allowed to be 10 Layers deep!"); } if (!this.validatePath(path)) { throw new Error("Path can only contain a-z A-Z 0-9 '-' '-' '<' and '>' "); } } protected async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> { path = [...path]; // Create modifiable copy let collectionID: string = undefined; let documentKey = path.length % 2 === 0 ? path.pop() : undefined; let key = path.join("/"); const lock = await this.database.collectionLocks.lock(key); try { collectionID = await this.database.collections.get(key).then(r => r.toString()).catch(resNull); if (!collectionID && create) { collectionID = nanoid(ALPHABET, 32); await this.database.collections.put(key, collectionID); } } finally { lock(); } return { collection: collectionID, document: documentKey, collectionKey: key }; } protected getKey(collection: string, document?: string) { return `${collection || ""}/${document || ""}`; } protected getDoc(collection: string, document: string) { return this.database.data .get(this.getKey(collection, document), { asBuffer: true }) .then(res => decode(res as Buffer)).catch(resNull); } protected sendChange(collection: string, document: string, type: ChangeTypes, data: any) { let change: Change = { type, document, data, sender: this.sender } let s = this.database.changes.get(this.getKey(collection, document)) if (s) s.forEach(e => setImmediate(() => e(change))) s = this.database.changes.get(this.getKey(collection)) if (s) s.forEach(e => setImmediate(() => e(change))) } protected static getConstructorParams(query: Query): [Database, string[], string] { return [query.database, query.path, query.sender]; } } interface UpdateData { [path: string]: { type: "value" | "timestamp" | "increment" | "push"; value: any; } } export class DocumentQuery extends Query { constructor(database: Database, path: string[], sender: string) { super(database, path, sender); this.onChange = this.onChange.bind(this); } public async get() { let { collection, document } = await this.resolve(this.path); if (!collection || !document) { return null; } return this.getDoc(collection, document) } public async set(data: any, { merge = false }) { if (data === null) return this.delete(); let { collection, document } = await this.resolve(this.path, true); if (!collection) { throw new Error("There must be a collection!") } if (!document) { throw new Error("There must be a document key!") } const lock = await this.database.locks.lock(collection, document); let isNew = !(await this.getDoc(collection, document)) return this.database.data .put(this.getKey(collection, document), encode(data)) .then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data)) .finally(() => lock()) } public async update(updateData: UpdateData) { let { collection, document } = await this.resolve(this.path, true); if (!collection) { throw new Error("There must be a collection!") } if (!document) { throw new Error("There must be a document key!") } // Logging.debug(updateData); const lock = await this.database.locks.lock(collection, document); try { let data = await this.getDoc(collection, document); let isNew = false if (!data) { isNew = true; data = {}; } for (let path in updateData) { const toUpdate = updateData[path]; let d = data; let parts = path.split("."); while (parts.length > 1) { let seg = parts.shift(); if (!data[seg]) data[seg] = {} d = data[seg]; } const last = parts[0]; // Logging.debug(parts, last, d) switch (toUpdate.type) { case "value": d[last] = toUpdate.value; break; case "increment": if (d[last] === undefined || d[last] === null) d[last] = toUpdate.value; else if (typeof d[last] !== "number") { throw new Error("Field is no number!"); } else { d[last] += toUpdate.value; } break; case "timestamp": d[last] = new Date().valueOf(); break; case "push": if (d[last] === undefined || d[last] === null) d[last] = [toUpdate.value]; else if (Array.isArray(d[last])) { d[last].push(toUpdate.value); } else { throw new Error("Field is not array!"); } break; default: throw new Error("Invalid update type: " + toUpdate.type); } } this.database.data .put(this.getKey(collection, document), encode(data)) .then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data)) } finally { lock(); } //TODO: Implement } public async delete() { let { collection, document } = await this.resolve(this.path); if (!collection) { throw new Error("There must be a collection!") } if (!document) { throw new Error("There must be a document key!") } const lock = await this.database.locks.lock(collection, document); return await this.database.data .del(`${collection}/${document}`) .then(() => this.sendChange(collection, document, "deleted", null)) .finally(() => lock()) } private subscription: { key: string, onChange: (change: DocRes & { type: ChangeTypes }) => void }; async snapshot(onChange: (change: DocRes & { type: ChangeTypes }) => void) { if (this.subscription) throw new Error("This query is already subscribed!"); let { collection, document } = await this.resolve(this.path); let data = await this.getDoc(collection, document); let key = this.getKey(collection, document); this.subscription = { key, onChange } let s = this.database.changes.get(key); if (!s) { s = new Set(); this.database.changes.set(key, s); } s.add(this.onChange); return data; } onChange(change: Change) { // if(change.sender === this.sender) // return this.subscription.onChange({ id: change.document, data: change.data, type: change.type }) } unsubscribe() { if (!this.subscription) return; let s = this.database.changes.get(this.subscription.key); s.delete(this.onChange); if (s.size <= 0) this.database.changes.delete(this.subscription.key); this.subscription = undefined; } public static fromQuery(query: Query) { return new DocumentQuery(...Query.getConstructorParams(query)); } } type FieldPath = string; type WhereFilterOp = | '<' | '<=' | '==' | '>=' | '>' | 'array-contains' | 'in' | 'array-contains-any'; interface IQueryWhere { fieldPath: FieldPath, opStr: WhereFilterOp, value: any } interface DocRes { id: string; data: any; } export class CollectionQuery extends Query { constructor(database: Database, path: string[], sender: string) { super(database, path, sender); this.onChange = this.onChange.bind(this); } public where: IQueryWhere[] = []; public limit: number = -1; public async add(value: any) { let id = nanoid(ALPHABET, 32); let q = new DocumentQuery(this.database, [...this.path, id], this.sender); await q.set(value, {}); return id; } private getStreamOptions(collection: string) { let gt = Buffer.from(this.getKey(collection) + " "); gt[gt.length - 1] = 0; let lt = Buffer.alloc(gt.length); lt.set(gt); lt[gt.length - 1] = 0xFF; return { gt, lt } } public async keys() { let { collection, document } = await this.resolve(this.path); if (document) throw new Error("Keys only works on collections!"); if (!collection) throw new Error("There must be a collection"); return new Promise((yes, no) => { let keys = []; const stream = this.database.data.createKeyStream({ ...this.getStreamOptions(collection), keyAsBuffer: false }) stream.on("data", (key: string) => { let s = key.split("/", 2); if (s.length > 1) keys.push(s[1]); }); stream.on("end", () => yes(keys)); stream.on("error", no); }); } private getFieldValue(data: any, path: FieldPath) { let parts = path.split("."); let d = data; while (parts.length > 0) { let seg = parts.shift(); d = data[seg]; if (d === undefined || d === null) break; // Undefined/Null has no other fields! } return d; } private fitsWhere(data: any): boolean { if (this.where.length > 0) { return this.where.every(where => { let val = this.getFieldValue(data, where.fieldPath); switch (where.opStr) { case "<": return val < where.value; case "<=": return val <= where.value; case "==": return val == where.value; case ">=": return val >= where.value; case ">": return val > where.value; case "array-contains": if (Array.isArray(val)) { return val.some(e => e === where.value); } break; // case "array-contains-any": // case "in": default: throw new Error("Invalid where operation " + where.opStr); } }) } return true; } async get() { let { collection, document } = await this.resolve(this.path); if (document) throw new Error("Keys only works on collections!"); if (!collection) throw new Error("There must be a collection"); return new Promise((yes, no) => { const stream = this.database.data.iterator({ ...this.getStreamOptions(collection), keyAsBuffer: false, valueAsBuffer: true }) let values: DocRes[] = []; const onValue = (err: Error, key: string, value: Buffer) => { if (err) { no(err); stream.end(err => Logging.error(err)); } else { if (!key && !value) { // END Logging.debug("Checked all!") yes(values); } else { let s = key.split("/", 2); if (s.length <= 1) return; const id = s[1]; let data = decode(value); if (this.fitsWhere(data)) { Logging.debug("Found fitting") if (this.limit < 0 || value.length < this.limit) { values.push({ id, data }); } else { stream.end((err) => err ? no(err) : yes(values)) return; } } stream.next(onValue); } } } stream.next(onValue) }) } private subscription: { key: string, onChange: (change: (DocRes & { type: ChangeTypes })[]) => void }; async snapshot(onChange: (change: (DocRes & { type: ChangeTypes })[]) => void) { if (this.subscription) throw new Error("This query is already subscribed!"); let { collection, document } = await this.resolve(this.path); let data = await this.get(); let key = this.getKey(collection, document); this.subscription = { key, onChange } let s = this.database.changes.get(key); if (!s) { s = new Set(); this.database.changes.set(key, s); } s.add(this.onChange); return data; } onChange(change: Change) { // if(change.sender === this.sender) // return if (this.fitsWhere(change.data)) { this.subscription.onChange([{ id: change.document, data: change.data, type: change.type }]) } } unsubscribe() { if (!this.subscription) return; let s = this.database.changes.get(this.subscription.key); s.delete(this.onChange); if (s.size <= 0) this.database.changes.delete(this.subscription.key); this.subscription = undefined; } public async collections() { return new Promise((yes, no) => { let keys = []; const stream = this.database.data.createKeyStream({ keyAsBuffer: false }) stream.on("data", (key: string) => keys.push(key.split("/"))); stream.on("end", () => yes(keys)); stream.on("error", no); }); } public async deleteCollection() { const { collection, document, collectionKey } = await this.resolve(this.path); if (document) { throw new Error("There can be no document defined on this operation"); } //TODO: Lock whole collection! let batch = this.database.data.batch(); try { if (collection) { let documents = await this.keys(); for (let document in documents) { batch.del(this.getKey(collection, document)); } await batch.write(); batch = undefined; await this.database.collections.del(collectionKey); } } finally { if (batch) batch.clear(); } } public static fromQuery(query: Query) { return new CollectionQuery(...Query.getConstructorParams(query)); } }