From 794820e1d3eb07acffeedd7218eecdf6d90dda5f Mon Sep 17 00:00:00 2001 From: Fabian Stamm Date: Wed, 17 Jun 2020 22:40:23 +0200 Subject: [PATCH] Adding some debug messages --- src/database/database.ts | 221 +++++++++++++++++++++------------------ src/database/lock.ts | 17 +-- src/websocket.ts | 77 +++++++++----- 3 files changed, 181 insertions(+), 134 deletions(-) diff --git a/src/database/database.ts b/src/database/database.ts index 6fc1baa..a109695 100644 --- a/src/database/database.ts +++ b/src/database/database.ts @@ -2,13 +2,21 @@ import { Rules } from "./rules"; import Settings from "../settings"; import getLevelDB, { LevelDB, deleteLevelDB, resNull } from "../storage"; import DocumentLock from "./lock"; -import { DocumentQuery, CollectionQuery, Query, QueryError, ITypedQuery, IQuery } from "./query"; +import { + DocumentQuery, + CollectionQuery, + Query, + QueryError, + ITypedQuery, + IQuery, +} from "./query"; import Logging from "@hibas123/nodelogging"; import Session from "./session"; import nanoid = require("nanoid/generate"); import { Observable } from "@hibas123/utils"; -const ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; +const ALPHABET = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; // interface ITransaction { // queries: ITypedQuery[]; @@ -20,15 +28,20 @@ export class DatabaseManager { static async init() { let databases = await Settings.getDatabases(); - databases.forEach(dbconfig => { - let db = new Database(dbconfig.name, dbconfig.accesskey, dbconfig.rules, dbconfig.publickey, dbconfig.rootkey); + databases.forEach((dbconfig) => { + let db = new Database( + dbconfig.name, + dbconfig.accesskey, + dbconfig.rules, + dbconfig.publickey, + dbconfig.rootkey + ); this.databases.set(dbconfig.name, db); - }) + }); } static async addDatabase(name: string) { - if (this.databases.has(name)) - throw new Error("Database already exists!"); + if (this.databases.has(name)) throw new Error("Database already exists!"); await Settings.addDatabase(name); let database = new Database(name); @@ -41,11 +54,11 @@ export class DatabaseManager { } static async deleteDatabase(name: string) { - let db = this.databases.get(name) + let db = this.databases.get(name); if (db) { await Settings.deleteDatabase(name); await db.stop(); - await deleteLevelDB(db.name) + await deleteLevelDB(db.name); } } } @@ -58,8 +71,7 @@ export type Change = { collection: string; type: ChangeTypes; sender: string; -} - +}; export class Database { public static getKey(collectionid: string, documentid?: string) { @@ -76,16 +88,15 @@ export class Database { return this.level.collection; } - public rules: Rules; - private locks = new DocumentLock() - public collectionLocks = new DocumentLock() + private locks = new DocumentLock(); + public collectionLocks = new DocumentLock(); public changeListener = new Map void>>(); public collectionChangeListener = new Observable<{ key: string; id: string; - type: "create" | "delete" + type: "create" | "delete"; }>(); toJSON() { @@ -93,13 +104,18 @@ export class Database { name: this.name, accesskey: this.accesskey, publickey: this.publickey, - rules: this.rules - } + rules: this.rules, + }; } - constructor(public name: string, public accesskey?: string, rawRules?: string, public publickey?: string, public rootkey?: string) { - if (rawRules) - this.rules = new Rules(rawRules); + constructor( + public name: string, + public accesskey?: string, + rawRules?: string, + public publickey?: string, + public rootkey?: string + ) { + if (rawRules) this.rules = new Rules(rawRules); } async setRules(rawRules: string) { @@ -123,7 +139,10 @@ export class Database { this.publickey = key; } - public async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> { + public async resolve( + path: string[], + create = false + ): Promise<{ collection: string; document: string; collectionKey: string }> { path = [...path]; // Create modifiable copy let collectionID: string = undefined; let documentKey = path.length % 2 === 0 ? path.pop() : undefined; @@ -132,7 +151,10 @@ export class Database { const lock = await this.collectionLocks.lock(key); try { - collectionID = await this.collections.get(key).then(r => r.toString()).catch(resNull); + collectionID = await this.collections + .get(key) + .then((r) => r.toString()) + .catch(resNull); if (!collectionID && create) { collectionID = nanoid(ALPHABET, 32); await this.collections.put(key, collectionID); @@ -140,9 +162,9 @@ export class Database { this.collectionChangeListener.send({ id: collectionID, key, - type: "create" - }) - }) + type: "create", + }); + }); } } finally { lock(); @@ -151,16 +173,16 @@ export class Database { return { collection: collectionID, document: documentKey, - collectionKey: key + collectionKey: key, }; } private sendChanges(changes: Change[]) { let col = new Map>(); - changes.forEach(change => { + changes.forEach((change) => { let e = col.get(change.collection); if (!e) { - e = new Map() + e = new Map(); col.set(change.collection, e); } @@ -171,66 +193,67 @@ export class Database { } d.push(change); - }) + }); setImmediate(() => { for (let [collection, documents] of col.entries()) { let collectionChanges = []; for (let [document, documentChanges] of documents.entries()) { - let s = this.changeListener.get(Database.getKey(collection, document)); - if (s) - s.forEach(e => setImmediate(() => e(documentChanges))); + let s = this.changeListener.get( + Database.getKey(collection, document) + ); + if (s) s.forEach((e) => setImmediate(() => e(documentChanges))); collectionChanges.push(...documentChanges); } - let s = this.changeListener.get(Database.getKey(collection)) - if (s) - s.forEach(e => setImmediate(() => e(collectionChanges))) + let s = this.changeListener.get(Database.getKey(collection)); + if (s) s.forEach((e) => setImmediate(() => e(collectionChanges))); } - }) + }); } private validate(query: ITypedQuery) { const inv = new QueryError("Malformed query!"); - if (!query || typeof query !== "object") - throw inv; + if (!query || typeof query !== "object") throw inv; - if (!query.type) - throw inv; + if (!query.type) throw inv; - if (!query.path) - throw inv; + if (!query.path) throw inv; } async run(queries: IQuery[], session: Session) { - let resolve: { path: string[], create: boolean, resolved?: [string, string, string] }[] = []; + let resolve: { + path: string[]; + create: boolean; + resolved?: [string, string, string]; + }[] = []; const addToResolve = (path: string[], create?: boolean) => { - let entry = resolve.find(e => { //TODO: Find may be slow... - if (e.path.length !== path.length) - return false; + let entry = resolve.find((e) => { + //TODO: Find may be slow... + if (e.path.length !== path.length) return false; for (let i = 0; i < e.path.length; i++) { - if (e.path[i] !== path[i]) - return false; + if (e.path[i] !== path[i]) return false; } return true; - }) + }); if (!entry) { entry = { path, - create - } + create, + }; resolve.push(entry); } entry.create = entry.create || create; return entry; - } + }; const isBatch = queries.length > 1; - let parsed = queries.map(rawQuery => { + let parsed = queries.map((rawQuery) => { + Logging.debug("Running query:", rawQuery.type); this.validate(rawQuery); const isCollection = rawQuery.path.length % 2 === 1; @@ -242,12 +265,11 @@ export class Database { throw new Error("There are queries that are not batch compatible!"); let path = addToResolve(rawQuery.path, query.createCollection); - if (query.additionalLock) - addToResolve(query.additionalLock); + if (query.additionalLock) addToResolve(query.additionalLock); return { path, - query + query, }; }); @@ -255,12 +277,13 @@ export class Database { let locks: (() => void)[] = []; for (let e of resolve) { - let { collection, document, collectionKey } = await this.resolve(e.path, e.create); + let { collection, document, collectionKey } = await this.resolve( + e.path, + e.create + ); e.resolved = [collection, document, collectionKey]; - locks.push( - await this.locks.lock(collection, document) - ); + locks.push(await this.locks.lock(collection, document)); } let result = []; @@ -269,46 +292,48 @@ export class Database { let changes: Change[] = []; for (let e of parsed) { result.push( - await e.query.run(e.path.resolved[0], e.path.resolved[1], batch, e.path.resolved[2]) + await e.query.run( + e.path.resolved[0], + e.path.resolved[1], + batch, + e.path.resolved[2] + ) ); changes.push(...e.query.changes); } - if (batch.length > 0) - await batch.write(); + if (batch.length > 0) await batch.write(); this.sendChanges(changes); } finally { - locks.forEach(lock => lock()); + locks.forEach((lock) => lock()); } - if (isBatch) - return result; - else - return result[0] + if (isBatch) return result; + else return result[0]; } - async snapshot(rawQuery: ITypedQuery<"snapshot">, session: Session, onchange: (change: any) => void) { + async snapshot( + rawQuery: ITypedQuery<"snapshot">, + session: Session, + onchange: (change: any) => void + ) { Logging.debug("Snaphot request:", rawQuery.path); this.validate(rawQuery); - if (rawQuery.type !== "snapshot") - throw new Error("Invalid query type!"); + if (rawQuery.type !== "snapshot") throw new Error("Invalid query type!"); const isCollection = rawQuery.path.length % 2 === 1; let query = isCollection ? new CollectionQuery(this, session, rawQuery, true) : new DocumentQuery(this, session, rawQuery, true); - const { - unsubscribe, - value - } = await query.snapshot(onchange); + const { unsubscribe, value } = await query.snapshot(onchange); const id = nanoid(ALPHABET, 16); session.subscriptions.set(id, unsubscribe); return { id, - snaphot: value + snaphot: value, }; } @@ -328,39 +353,39 @@ export class Database { const should = await new Promise>((yes, no) => { const stream = this.collections.iterator({ keyAsBuffer: false, - valueAsBuffer: false - }) + valueAsBuffer: false, + }); const collections = new Set(); const onValue = (err: Error, key: string, value: string) => { if (err) { Logging.error(err); - stream.end((err) => Logging.error(err)) + stream.end((err) => Logging.error(err)); no(err); } if (!key && !value) { yes(collections); } else { - collections.add(value) + collections.add(value); stream.next(onValue); } - } + }; stream.next(onValue); - }) + }); const existing = await new Promise>((yes, no) => { const stream = this.data.iterator({ keyAsBuffer: false, - values: false - }) + values: false, + }); const collections = new Set(); const onValue = (err: Error, key: string, value: Buffer) => { if (err) { Logging.error(err); - stream.end((err) => Logging.error(err)) + stream.end((err) => Logging.error(err)); no(err); } @@ -368,19 +393,18 @@ export class Database { yes(collections); } else { let coll = key.split("/")[0]; - collections.add(coll) + collections.add(coll); stream.next(onValue); } - } + }; stream.next(onValue); - }) + }); const toDelete = new Set(); - existing.forEach(collection => { - if (!should.has(collection)) - toDelete.add(collection); - }) + existing.forEach((collection) => { + if (!should.has(collection)) toDelete.add(collection); + }); for (let collection of toDelete) { const batch = this.data.batch(); @@ -390,20 +414,20 @@ export class Database { let lt = Buffer.alloc(gt.length); lt.set(gt); - lt[gt.length - 1] = 0xFF; + lt[gt.length - 1] = 0xff; await new Promise((yes, no) => { const stream = this.data.iterator({ keyAsBuffer: false, values: false, gt, - lt - }) + lt, + }); const onValue = (err: Error, key: string, value: Buffer) => { if (err) { Logging.error(err); - stream.end((err) => Logging.error(err)) + stream.end((err) => Logging.error(err)); no(err); } @@ -413,15 +437,14 @@ export class Database { batch.del(key); stream.next(onValue); } - } + }; stream.next(onValue); - }) + }); await batch.write(); } return Array.from(toDelete.values()); } - } diff --git a/src/database/lock.ts b/src/database/lock.ts index dfe5e44..66df045 100644 --- a/src/database/lock.ts +++ b/src/database/lock.ts @@ -4,7 +4,7 @@ export default class DocumentLock { private locks = new Map void)[]>(); getLocks() { - return Array.from(this.locks.keys()) + return Array.from(this.locks.keys()); } async lock(collection: string = "", document: string = "") { @@ -12,17 +12,18 @@ export default class DocumentLock { let key = collection + "/" + document; let l = this.locks.get(key); if (l) - await new Promise(resolve => { l.push(resolve); this.locks.set(key, l) }); + await new Promise((resolve) => { + l.push(resolve); + this.locks.set(key, l); + }); else { l = []; this.locks.set(key, l); } return () => { - if (l.length > 0) - setImmediate(() => l.shift()()); - else - this.locks.delete(key) - } + if (l.length > 0) setImmediate(() => l.shift()()); + else this.locks.delete(key); + }; } -} \ No newline at end of file +} diff --git a/src/websocket.ts b/src/websocket.ts index b8aa52d..731bd5b 100644 --- a/src/websocket.ts +++ b/src/websocket.ts @@ -2,7 +2,12 @@ import Logging from "@hibas123/nodelogging"; import { IncomingMessage, Server } from "http"; import * as WebSocket from "ws"; import { DatabaseManager } from "./database/database"; -import { CollectionQuery, DocumentQuery, IQuery, ITypedQuery } from "./database/query"; +import { + CollectionQuery, + DocumentQuery, + IQuery, + ITypedQuery, +} from "./database/query"; import Session from "./database/session"; import { verifyJWT } from "./helper/jwt"; import nanoid = require("nanoid"); @@ -17,7 +22,8 @@ export class WebsocketConnectionManager { private static async onConnection(socket: WebSocket, req: IncomingMessage) { Logging.log("New Connection:"); - const sendError = (error: string) => socket.send(JSON.stringify({ ns: "error_msg", data: error })); + const sendError = (error: string) => + socket.send(JSON.stringify({ ns: "error_msg", data: error })); const session = new Session(nanoid()); @@ -61,16 +67,19 @@ export class WebsocketConnectionManager { } const answer = (id: string, data: any, error: boolean = false) => { - if (error) - Logging.error(error as any); - socket.send(JSON.stringify({ ns: "message", data: { id, error, data } })); - } + if (error) Logging.error(error as any); + socket.send( + JSON.stringify({ ns: "message", data: { id, error, data } }) + ); + }; - const handler = new Map void)>(); + const handler = new Map void>(); - handler.set("v2", async ({ id, query }) => db.run(Array.isArray(query) ? query : [query], session) - .then(res => answer(id, res)) - .catch(err => answer(id, undefined, err)) + handler.set("v2", async ({ id, query }) => + db + .run(Array.isArray(query) ? query : [query], session) + .then((res) => answer(id, res)) + .catch((err) => answer(id, undefined, err)) ); // handler.set("bulk", async ({ id, query }) => db.run(query, session) @@ -78,18 +87,32 @@ export class WebsocketConnectionManager { // .catch(err => answer(id, undefined, err)) // ); - const SnapshotMap = new Map(); - handler.set("snapshot", async ({ id, query }: { id: string, query: ITypedQuery<"snapshot"> }) => { - db.snapshot(query, session, (data => { - socket.send(JSON.stringify({ - ns: "snapshot", data: { id, data } - })); - })).then(s => { - answer(id, s.snaphot); - SnapshotMap.set(id, s.id); - }).catch(err => answer(id, undefined, err)); - }) + handler.set( + "snapshot", + async ({ + id, + query, + }: { + id: string; + query: ITypedQuery<"snapshot">; + }) => { + db.snapshot(query, session, (data) => { + Logging.debug("Sending snapshot"); + socket.send( + JSON.stringify({ + ns: "snapshot", + data: { id, data }, + }) + ); + }) + .then((s) => { + answer(id, s.snaphot); + SnapshotMap.set(id, s.id); + }) + .catch((err) => answer(id, undefined, err)); + } + ); handler.set("unsubscribe", async ({ id }) => { let i = SnapshotMap.get(id); @@ -97,11 +120,11 @@ export class WebsocketConnectionManager { db.unsubscribe(i, session); SnapshotMap.delete(i); } - }) + }); socket.on("message", async (rawData: string) => { try { - let message: { ns: string, data: any } = JSON.parse(rawData); + let message: { ns: string; data: any } = JSON.parse(rawData); let h = handler.get(message.ns); if (h) { h(message.data); @@ -110,13 +133,13 @@ export class WebsocketConnectionManager { Logging.errorMessage("Unknown Error:"); Logging.error(err); } - }) + }); socket.on("close", () => { Logging.log(`${session.id} has disconnected!`); - session.subscriptions.forEach(unsubscribe => unsubscribe()); + session.subscriptions.forEach((unsubscribe) => unsubscribe()); session.subscriptions.clear(); socket.removeAllListeners(); - }) + }); } -} \ No newline at end of file +}