import Settings from "../settings"; import getLevelDB, { deleteLevelDB, resNull } from "../storage"; import DocumentLock from "./lock"; import { DocumentQuery, CollectionQuery, Query, QueryError, ITypedQuery, IQuery, } from "./query"; import Logging from "@hibas123/nodelogging"; import Session from "./session"; import nanoid = require("nanoid"); import { Observable } from "@hibas123/utils"; import { RuleRunner } from "../rules/compile"; import compileRule from "../rules"; import { RuleError } from "../rules/error"; const ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; const longNanoId = nanoid.customAlphabet(ALPHABET, 32); const shortNanoId = nanoid.customAlphabet(ALPHABET, 16); // interface ITransaction { // queries: ITypedQuery[]; // } export class DatabaseManager { static databases = new Map(); static async init() { let databases = await Settings.getDatabases(); databases.forEach((dbconfig) => { let db = new Database( dbconfig.name, dbconfig.accesskey, dbconfig.rules, dbconfig.publickey, dbconfig.rootkey ); this.databases.set(dbconfig.name, db); }); } static async addDatabase(name: string) { if (this.databases.has(name)) throw new Error("Database already exists!"); await Settings.addDatabase(name); let database = new Database(name); this.databases.set(name, database); return database; } static getDatabase(name: string) { return this.databases.get(name); } static async deleteDatabase(name: string) { let db = this.databases.get(name); if (db) { await Settings.deleteDatabase(name); await db.stop(); await deleteLevelDB(db.name); } } } export type ChangeTypes = "added" | "modified" | "deleted"; export type Change = { data: any; document: string; collection: string; type: ChangeTypes; sender: string; }; export class Database { public static getKey(collectionid: string, documentid?: string) { return `${collectionid || ""}/${documentid || ""}`; } #level = getLevelDB(this.name); get data() { return this.#level.data; } get collections() { return this.#level.collection; } #rules: RuleRunner; #rawRules?: string; get rawRules() { return this.#rawRules; } get rules() { return this.#rules; } public connections = 0; private locks = new DocumentLock(); public collectionLocks = new DocumentLock(); public changeListener = new Map void>>(); public collectionChangeListener = new Observable<{ key: string; id: string; type: "create" | "delete"; }>(); toJSON() { return { name: this.name, accesskey: this.accesskey, publickey: this.publickey, rules: this.#rules, connections: this.connections, }; } constructor( public name: string, public accesskey?: string, rawRules?: string, public publickey?: string, public rootkey?: string ) { if (rawRules) this.applyRules(rawRules); } private applyRules(rawRules: string): undefined | RuleError { try { JSON.parse(rawRules); Logging.warning( "Found old rule! Replacing with a 100% permissive one!" ); rawRules = "service realtimedb {\n match /* {\n allow read, write, list: if false; \n }\n}"; // still json, so switching to } catch (err) {} let { runner, error } = compileRule(rawRules); if (error) { Logging.warning("Found error in existing config!", error); runner = compileRule("service realtimesb {}").runner; } this.#rules = runner; this.#rawRules = rawRules; return undefined; } async setRules(rawRules: string) { const { runner, error } = compileRule(rawRules); if (error) return error; await Settings.setDatabaseRules(this.name, rawRules); this.#rules = runner; this.#rawRules = rawRules; } async setAccessKey(key: string) { await Settings.setDatabaseAccessKey(this.name, key); this.accesskey = key; } async setRootKey(key: string) { await Settings.setDatabaseRootKey(this.name, key); this.rootkey = key; } async setPublicKey(key: string) { await Settings.setDatabasePublicKey(this.name, key); this.publickey = key; } public async resolve( path: string[], create = false ): Promise<{ collection: string; document: string; collectionKey: string }> { path = [...path]; // Create modifiable copy let collectionID: string = undefined; let documentKey = path.length % 2 === 0 ? path.pop() : undefined; let key = path.join("/"); const lock = await this.collectionLocks.lock(key); try { collectionID = await this.collections .get(key) .then((r) => r.toString()) .catch(resNull); if (!collectionID && create) { collectionID = longNanoId(); await this.collections.put(key, collectionID); setImmediate(() => { this.collectionChangeListener.send({ id: collectionID, key, type: "create", }); }); } } finally { lock(); } return { collection: collectionID, document: documentKey, collectionKey: key, }; } private sendChanges(changes: Change[]) { let col = new Map>(); changes.forEach((change) => { let e = col.get(change.collection); if (!e) { e = new Map(); col.set(change.collection, e); } let d = e.get(change.document); if (!d) { d = []; e.set(change.document, d); } d.push(change); }); setImmediate(() => { for (let [collection, documents] of col.entries()) { let collectionChanges = []; for (let [document, documentChanges] of documents.entries()) { let s = this.changeListener.get( Database.getKey(collection, document) ); if (s) s.forEach((e) => setImmediate(() => e(documentChanges))); collectionChanges.push(...documentChanges); } let s = this.changeListener.get(Database.getKey(collection)); if (s) s.forEach((e) => setImmediate(() => e(collectionChanges))); } }); } private validate(query: ITypedQuery) { const inv = new QueryError("Malformed query!"); if (!query || typeof query !== "object") throw inv; if (!query.type) throw inv; if (!query.path) throw inv; } async run(queries: IQuery[], session: Session) { let resolve: { path: string[]; create: boolean; resolved?: [string, string, string]; }[] = []; const addToResolve = (path: string[], create?: boolean) => { let entry = resolve.find((e) => { //TODO: Find may be slow... if (e.path.length !== path.length) return false; for (let i = 0; i < e.path.length; i++) { if (e.path[i] !== path[i]) return false; } return true; }); if (!entry) { entry = { path, create, }; resolve.push(entry); } entry.create = entry.create || create; return entry; }; const isBatch = queries.length > 1; let parsed = queries.map((rawQuery) => { Logging.debug("Running query:", rawQuery.type); this.validate(rawQuery); const isCollection = rawQuery.path.length % 2 === 1; let query = isCollection ? new CollectionQuery(this, session, rawQuery) : new DocumentQuery(this, session, rawQuery); if (isBatch && !query.batchCompatible) throw new Error("There are queries that are not batch compatible!"); let path = addToResolve(rawQuery.path, query.createCollection); if (query.additionalLock) addToResolve(query.additionalLock); return { path, query, }; }); resolve = resolve.sort((a, b) => a.path.length - b.path.length); let locks: (() => void)[] = []; for (let e of resolve) { let { collection, document, collectionKey } = await this.resolve( e.path, e.create ); e.resolved = [collection, document, collectionKey]; locks.push(await this.locks.lock(collection, document)); } let result = []; try { let batch = this.data.batch(); let changes: Change[] = []; for (let e of parsed) { result.push( await e.query.run( e.path.resolved[0], e.path.resolved[1], batch, e.path.resolved[2] ) ); changes.push(...e.query.changes); } if (batch.length > 0) await batch.write(); this.sendChanges(changes); } finally { locks.forEach((lock) => lock()); } if (isBatch) return result; else return result[0]; } async snapshot( rawQuery: ITypedQuery<"snapshot">, session: Session, onchange: (change: any) => void ) { Logging.debug("Snaphot request:", rawQuery.path); this.validate(rawQuery); if (rawQuery.type !== "snapshot") throw new Error("Invalid query type!"); const isCollection = rawQuery.path.length % 2 === 1; let query = isCollection ? new CollectionQuery(this, session, rawQuery, true) : new DocumentQuery(this, session, rawQuery, true); const { unsubscribe, value } = await query.snapshot(onchange); const id = shortNanoId(); session.subscriptions.set(id, unsubscribe); return { id, snaphot: value, }; } async unsubscribe(id: string, session: Session) { let query = session.subscriptions.get(id); if (query) { query(); session.subscriptions.delete(id); } } async stop() { await this.data.close(); } public async runCleanup() { const should = await new Promise>((yes, no) => { const stream = this.collections.iterator({ keyAsBuffer: false, valueAsBuffer: false, }); const collections = new Set(); const onValue = (err: Error, key: string, value: string) => { if (err) { Logging.error(err); stream.end((err) => Logging.error(err)); no(err); } if (!key && !value) { yes(collections); } else { collections.add(value); stream.next(onValue); } }; stream.next(onValue); }); const existing = await new Promise>((yes, no) => { const stream = this.data.iterator({ keyAsBuffer: false, values: false, }); const collections = new Set(); const onValue = (err: Error, key: string, value: Buffer) => { if (err) { Logging.error(err); stream.end((err) => Logging.error(err)); no(err); } if (!key && !value) { yes(collections); } else { let coll = key.split("/")[0]; collections.add(coll); stream.next(onValue); } }; stream.next(onValue); }); const toDelete = new Set(); existing.forEach((collection) => { if (!should.has(collection)) toDelete.add(collection); }); for (let collection of toDelete) { const batch = this.data.batch(); let gt = Buffer.from(collection + "/ "); gt[gt.length - 1] = 0; let lt = Buffer.alloc(gt.length); lt.set(gt); lt[gt.length - 1] = 0xff; await new Promise((yes, no) => { const stream = this.data.iterator({ keyAsBuffer: false, values: false, gt, lt, }); const onValue = (err: Error, key: string, value: Buffer) => { if (err) { Logging.error(err); stream.end((err) => Logging.error(err)); no(err); } if (!key && !value) { yes(); } else { batch.del(key); stream.next(onValue); } }; stream.next(onValue); }); await batch.write(); } return Array.from(toDelete.values()); } }