diff --git a/package.json b/package.json index da867e6..449bac8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hibas123/realtimedb", - "version": "2.0.0-beta.1", + "version": "2.0.0-beta.2", "description": "", "main": "lib/index.js", "private": true, diff --git a/src/connection.ts b/src/connection.ts index afa66dc..e3ea3ed 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -2,7 +2,7 @@ import * as WebSocket from "ws"; import { Server, IncomingMessage } from "http"; import { DatabaseManager } from "./database/database"; import Logging from "@hibas123/logging"; -import Query from "./database/query"; +import { Query, CollectionQuery, DocumentQuery } from "./database/query"; import Session from "./database/session"; import nanoid = require("nanoid"); @@ -100,100 +100,40 @@ export class ConnectionManager { const handler = new Map void)>(); type QueryData = { id: string, type: QueryTypes, path: string[], data: any, options: any }; - const queryHandler = new Map Promise>(); - - const noperm = new Error("No permisison!"); - - if (session.root) { - queryHandler.set("collections", (query, perm, data) => { - return query.getCollections(); - }) - - queryHandler.set("delete-collection", (query, perm, data) => { - return query.deleteCollection(); - }) - } - - queryHandler.set("keys", (query, perm, data) => { - if (!perm.read) - throw noperm; - return query.keys(); - }) - - queryHandler.set("get", (query, perm, data) => { - if (!perm.read) - throw noperm; - return query.get(); - }) - - queryHandler.set("set", (query, perm, data) => { - if (!perm.write) - throw noperm; - return query.set(data, {}); - }) - - queryHandler.set("update", (query, perm, data) => { - if (!perm.write) - throw noperm; - return query.update(data); - }) - - queryHandler.set("push", (query, perm, data) => { - if (!perm.write) - throw noperm; - return query.push(data); - }) - - queryHandler.set("delete", (query, perm, data) => { - if (!perm.write) - throw noperm; - return query.delete(); - }) - - queryHandler.set("subscribe", async (query, perm, data) => { - if (!perm.read) - throw noperm; - - let subscriptionID = nanoid(); - - query.subscribe((data) => { - socket.send(JSON.stringify({ ns: "event", data: { id: subscriptionID, data } })); - }, data.types, data.options); - - return StoreQuery(subscriptionID); - }) - - queryHandler.set("unsubscribe", async (query, perm, data) => { - query.unsubscribe(); - return DeleteQuery(true); - }) - handler.set("query", async ({ id, type, path, data }: QueryData) => { //TODO: Handle case with no id, type, path - Logging.debug(`Request with id '${id}' from type '${type}' and path '${path}' with data`, data) + Logging.debug(`Request with id '${id}' from type '${type}' and path '${path.join("/")}' with data`, data) try { if (!db) throw new Error("Database not found!"); else { - let handler = queryHandler.get(type); - if (!handler) { + let isDoc = path.length % 2 === 0; + let handler = isDoc ? documentHandler.get(type) : collectionHandler.get(type); + + if (!handler && session.root) + handler = rootHandler.get(type); + + if (!handler) throw new Error("Invalid Request!"); - } else { - const query = stored.get(id) || db.getQuery(path || [], session.sessionid); - const perms = db.rules.hasPermission(path, session); - let res = await handler(query, perms, data); - if (res && typeof res === "object" && res[StoreSym] !== undefined) { - if (res[StoreSym]) - stored.set(id, query); - else - stored.delete(id); + let query = db.getQuery(path || [], session.sessionid, isDoc ? "document" : "collection"); + let res = await handler({ + id, + data, + socket, + query: query as any // We know it is the right one + }) - res = res.result; - } - answer(id, res); + if (res && typeof res === "object" && res[StoreSym] !== undefined) { + if (res[StoreSym]) + stored.set(id, query); + else + stored.delete(id); + + res = res.result; } + answer(id, res); } } catch (err) { // Logging.error(err); @@ -215,10 +155,103 @@ export class ConnectionManager { } }) - socket.on("disconnect", () => { - stored.forEach(query => query.unsubscribe()); + socket.on("close", () => { + Logging.log(`${session.sessionid} has disconnected!`); + Logging.debug("Clearing stored:", stored); + stored.forEach(query => (query as DocumentQuery | CollectionQuery).unsubscribe()); stored.clear(); socket.removeAllListeners(); }) } -} \ No newline at end of file +} + +type QueryHandler = (api: { + id: string; + query: T; + // storedQuery(id: string): T | undefined; + socket: WebSocket; + data: any; +}) => any; + +const NoPermissionError = new Error("No permisison!"); + +const rootHandler = new Map>(); +rootHandler.set("collections", ({ query }) => { + return CollectionQuery.fromQuery(query).collections(); +}) + +rootHandler.set("delete-collection", ({ query }) => { + return CollectionQuery.fromQuery(query).deleteCollection(); +}) + + +const documentHandler = new Map>(); +documentHandler.set("get", ({ query }) => { + return query.get(); +}) + +documentHandler.set("set", ({ query, data }) => { + return query.set(data, {}); +}) + +documentHandler.set("update", ({ query, data }) => { + return query.update(data); +}) + +documentHandler.set("delete", ({ query }) => { + return query.delete(); +}) + +documentHandler.set("snapshot", async ({ query, data, id, socket }) => { + let res = await query.snapshot((data) => { + socket.send(JSON.stringify({ + ns: "snapshot", data: { id, data } + })); + }); + + return StoreQuery(res); +}) + +documentHandler.set("unsubscribe", async ({ query }) => { + query.unsubscribe(); + return DeleteQuery(true); +}) + + +const collectionHandler = new Map>(); +collectionHandler.set("keys", ({ query }) => { + return query.keys(); +}) + + +collectionHandler.set("add", ({ query, data }) => { + return query.add(data); +}) + +collectionHandler.set("get", ({ query, data }) => { + if (data.where) + query.where = data.where; + if (data.limit) + query.limit = data.limit; + return query.get(); +}) + +collectionHandler.set("snapshot", async ({ query, id, socket, data }) => { + if (data.where) + query.where = data.where; + if (data.limit) + query.limit = data.limit; + + let res = await query.snapshot((data) => { + socket.send(JSON.stringify({ + ns: "snapshot", data: { id, data } + })); + }); + + return StoreQuery(res); +}) + +collectionHandler.set("unsubscribe", async ({ query }) => { + query.unsubscribe(); + return DeleteQuery(true); +}) \ No newline at end of file diff --git a/src/database/database.ts b/src/database/database.ts index d1d45a5..d08fed1 100644 --- a/src/database/database.ts +++ b/src/database/database.ts @@ -2,7 +2,7 @@ import { Rules } from "./rules"; import Settings from "../settings"; import getLevelDB, { LevelDB, deleteLevelDB } from "../storage"; import DocumentLock from "./lock"; -import Query from "./query"; +import { DocumentQuery, CollectionQuery, Query } from "./query"; export class DatabaseManager { static databases = new Map(); @@ -104,8 +104,13 @@ export class Database { } - getQuery(path: string[], sender: string) { - return new Query(this, path, sender); + getQuery(path: string[], sender: string, type: "document" | "collection" | "any") { + if (type === "document") + return new DocumentQuery(this, path, sender); + else if (type === "collection") + return new CollectionQuery(this, path, sender); + else + return new Query(this, path, sender); } async stop() { diff --git a/src/database/lock.ts b/src/database/lock.ts index 4388dd7..2f53dc0 100644 --- a/src/database/lock.ts +++ b/src/database/lock.ts @@ -19,7 +19,7 @@ export default class DocumentLock { return () => { if (l.length > 0) - setImmediate(() => l.shift()); + setImmediate(() => l.shift()()); else this.locks.delete(key) } diff --git a/src/database/query.ts b/src/database/query.ts index c7d39d2..a62182a 100644 --- a/src/database/query.ts +++ b/src/database/query.ts @@ -14,7 +14,7 @@ interface ISubscribeOptions { existing: boolean; } -export default class Query { +export class Query { /** * Returns true if the path only contains valid characters and false if it doesn't * @param path Path to be checked @@ -23,19 +23,17 @@ export default class Query { return path.every(e => (e.match(/[^a-zA-Z0-9_\-\<\>]/g) || []).length === 0); } - constructor(private database: Database, private path: string[], private sender: string) { + constructor(protected database: Database, protected path: string[], protected sender: string) { if (path.length > 10) { throw new Error("Path is to long. Path is only allowed to be 10 Layers deep!"); } if (!this.validatePath(path)) { throw new Error("Path can only contain a-z A-Z 0-9 '-' '-' '<' and '>' "); } - - this.onChange = this.onChange.bind(this); } - private async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> { + protected async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> { path = [...path]; // Create modifiable copy let collectionID: string = undefined; let documentKey = path.length % 2 === 0 ? path.pop() : undefined; @@ -60,15 +58,17 @@ export default class Query { }; } - private getKey(collection: string, document?: string) { + protected getKey(collection: string, document?: string) { return `${collection || ""}/${document || ""}`; } - private getDoc(collection: string, document: string) { - return this.database.data.get(this.getKey(collection, document), { asBuffer: true }).then(res => decode(res as Buffer)).catch(resNull); + protected getDoc(collection: string, document: string) { + return this.database.data + .get(this.getKey(collection, document), { asBuffer: true }) + .then(res => decode(res as Buffer)).catch(resNull); } - private sendChange(collection: string, document: string, type: ChangeTypes, data: any) { + protected sendChange(collection: string, document: string, type: ChangeTypes, data: any) { let change: Change = { type, document, @@ -77,6 +77,7 @@ export default class Query { } let s = this.database.changes.get(this.getKey(collection, document)) + if (s) s.forEach(e => setImmediate(() => e(change))) s = this.database.changes.get(this.getKey(collection)) @@ -85,46 +86,31 @@ export default class Query { } + protected static getConstructorParams(query: Query): [Database, string[], string] { + return [query.database, query.path, query.sender]; + } +} + +interface UpdateData { + [path: string]: { + type: "value" | "timestamp" | "increment" | "push"; + value: any; + } +} +export class DocumentQuery extends Query { + constructor(database: Database, path: string[], sender: string) { + super(database, path, sender); + this.onChange = this.onChange.bind(this); + } + public async get() { let { collection, document } = await this.resolve(this.path); if (!collection || !document) { return null; } - const lock = await this.database.locks.lock(collection, document); - return this.getDoc(collection, document).finally(() => lock()) - } - public async keys() { - let { collection, document } = await this.resolve(this.path); - if (document) - throw new Error("Keys only works on collections!"); - if (!collection) - throw new Error("There must be a collection"); - - - let gt = Buffer.from(this.getKey(collection) + " "); - gt[gt.length - 1] = 0; - - let lt = Buffer.alloc(gt.length); - lt.set(gt); - lt[gt.length - 1] = 0xFF; - - return new Promise((yes, no) => { - let keys = []; - const stream = this.database.data.createKeyStream({ - gt, - lt, - keyAsBuffer: false - }) - stream.on("data", (key: string) => { - let s = key.split("/", 2); - if (s.length > 1) - keys.push(s[1]); - }); - stream.on("end", () => yes(keys)); - stream.on("error", no); - }); + return this.getDoc(collection, document) } public async set(data: any, { merge = false }) { @@ -149,11 +135,79 @@ export default class Query { .finally(() => lock()) } - public async push(value: any) { - let id = nanoid(ALPHABET, 32); - let q = new Query(this.database, [...this.path, id], this.sender); - await q.set(value, {}); - return id; + public async update(updateData: UpdateData) { + let { collection, document } = await this.resolve(this.path, true); + if (!collection) { + throw new Error("There must be a collection!") + } + + if (!document) { + throw new Error("There must be a document key!") + } + + // Logging.debug(updateData); + + const lock = await this.database.locks.lock(collection, document); + try { + let data = await this.getDoc(collection, document); + let isNew = false + if (!data) { + isNew = true; + data = {}; + } + + for (let path in updateData) { + const toUpdate = updateData[path]; + let d = data; + let parts = path.split("."); + while (parts.length > 1) { + let seg = parts.shift(); + if (!data[seg]) + data[seg] = {} + d = data[seg]; + } + + const last = parts[0]; + + // Logging.debug(parts, last, d) + + switch (toUpdate.type) { + case "value": + d[last] = toUpdate.value; + break; + case "increment": + if (d[last] === undefined || d[last] === null) + d[last] = toUpdate.value; + else if (typeof d[last] !== "number") { + throw new Error("Field is no number!"); + } else { + d[last] += toUpdate.value; + } + break; + case "timestamp": + d[last] = new Date().valueOf(); + break; + case "push": + if (d[last] === undefined || d[last] === null) + d[last] = [toUpdate.value]; + else if (Array.isArray(d[last])) { + d[last].push(toUpdate.value); + } else { + throw new Error("Field is not array!"); + } + break; + default: + throw new Error("Invalid update type: " + toUpdate.type); + } + } + + this.database.data + .put(this.getKey(collection, document), encode(data)) + .then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data)) + } finally { + lock(); + } + //TODO: Implement } public async delete() { @@ -167,7 +221,6 @@ export default class Query { throw new Error("There must be a document key!") } - const lock = await this.database.locks.lock(collection, document); return await this.database.data @@ -176,11 +229,294 @@ export default class Query { .finally(() => lock()) } - public async update(data: any) { - //TODO: Implement + + + private subscription: { + key: string, + onChange: (change: DocRes & { type: ChangeTypes }) => void + }; + + async snapshot(onChange: (change: DocRes & { type: ChangeTypes }) => void) { + if (this.subscription) + throw new Error("This query is already subscribed!"); + let { collection, document } = await this.resolve(this.path); + + let data = await this.getDoc(collection, document); + let key = this.getKey(collection, document); + this.subscription = { + key, + onChange + } + let s = this.database.changes.get(key); + if (!s) { + s = new Set(); + this.database.changes.set(key, s); + } + + s.add(this.onChange); + + return data; } - public async getCollections() { + onChange(change: Change) { + // if(change.sender === this.sender) + // return + this.subscription.onChange({ + id: change.document, + data: change.data, + type: change.type + }) + } + + unsubscribe() { + if (!this.subscription) + return; + let s = this.database.changes.get(this.subscription.key); + s.delete(this.onChange); + if (s.size <= 0) + this.database.changes.delete(this.subscription.key); + + this.subscription = undefined; + } + + public static fromQuery(query: Query) { + return new DocumentQuery(...Query.getConstructorParams(query)); + } +} + +type FieldPath = string; +type WhereFilterOp = + | '<' + | '<=' + | '==' + | '>=' + | '>' + | 'array-contains' + | 'in' + | 'array-contains-any'; + +interface IQueryWhere { + fieldPath: FieldPath, + opStr: WhereFilterOp, + value: any +} + + +interface DocRes { + id: string; + data: any; +} + +export class CollectionQuery extends Query { + constructor(database: Database, path: string[], sender: string) { + super(database, path, sender); + this.onChange = this.onChange.bind(this); + } + + + public where: IQueryWhere[] = []; + public limit: number = -1; + + public async add(value: any) { + let id = nanoid(ALPHABET, 32); + let q = new DocumentQuery(this.database, [...this.path, id], this.sender); + await q.set(value, {}); + return id; + } + + private getStreamOptions(collection: string) { + let gt = Buffer.from(this.getKey(collection) + " "); + gt[gt.length - 1] = 0; + + let lt = Buffer.alloc(gt.length); + lt.set(gt); + lt[gt.length - 1] = 0xFF; + + return { + gt, + lt + } + } + + public async keys() { + let { collection, document } = await this.resolve(this.path); + if (document) + throw new Error("Keys only works on collections!"); + if (!collection) + throw new Error("There must be a collection"); + + return new Promise((yes, no) => { + let keys = []; + const stream = this.database.data.createKeyStream({ + ...this.getStreamOptions(collection), + keyAsBuffer: false + }) + stream.on("data", (key: string) => { + let s = key.split("/", 2); + if (s.length > 1) + keys.push(s[1]); + }); + stream.on("end", () => yes(keys)); + stream.on("error", no); + }); + } + + private getFieldValue(data: any, path: FieldPath) { + let parts = path.split("."); + let d = data; + while (parts.length > 0) { + let seg = parts.shift(); + + d = data[seg]; + if (d === undefined || d === null) + break; // Undefined/Null has no other fields! + } + return d; + } + + private fitsWhere(data: any): boolean { + if (this.where.length > 0) { + return this.where.every(where => { + let val = this.getFieldValue(data, where.fieldPath); + switch (where.opStr) { + case "<": + return val < where.value; + case "<=": + return val <= where.value; + case "==": + return val == where.value; + case ">=": + return val >= where.value; + case ">": + return val > where.value; + case "array-contains": + if (Array.isArray(val)) { + return val.some(e => e === where.value); + } + + break; + // case "array-contains-any": + // case "in": + + default: + throw new Error("Invalid where operation " + where.opStr); + } + }) + } + return true; + } + + async get() { + let { collection, document } = await this.resolve(this.path); + if (document) + throw new Error("Keys only works on collections!"); + if (!collection) + throw new Error("There must be a collection"); + return new Promise((yes, no) => { + const stream = this.database.data.iterator({ + ...this.getStreamOptions(collection), + keyAsBuffer: false, + valueAsBuffer: true + }) + + let values: DocRes[] = []; + + const onValue = (err: Error, key: string, value: Buffer) => { + if (err) { + no(err); + stream.end(err => Logging.error(err)); + } + else { + if (!key && !value) { + // END + Logging.debug("Checked all!") + yes(values); + } else { + let s = key.split("/", 2); + if (s.length <= 1) + return; + + const id = s[1]; + + let data = decode(value); + if (this.fitsWhere(data)) { + Logging.debug("Found fitting") + if (this.limit < 0 || value.length < this.limit) { + values.push({ + id, + data + }); + + } + else { + stream.end((err) => err ? no(err) : yes(values)) + return; + } + } + + stream.next(onValue); + } + } + } + + stream.next(onValue) + }) + } + + private subscription: { + key: string, + onChange: (change: (DocRes & { type: ChangeTypes })[]) => void + }; + + async snapshot(onChange: (change: (DocRes & { type: ChangeTypes })[]) => void) { + if (this.subscription) + throw new Error("This query is already subscribed!"); + let { collection, document } = await this.resolve(this.path); + + let data = await this.get(); + + let key = this.getKey(collection, document); + this.subscription = { + key, + onChange + } + let s = this.database.changes.get(key); + if (!s) { + s = new Set(); + this.database.changes.set(key, s); + } + + s.add(this.onChange); + + return data; + } + + onChange(change: Change) { + // if(change.sender === this.sender) + // return + + if (this.fitsWhere(change.data)) { + this.subscription.onChange([{ + id: change.document, + data: change.data, + type: change.type + }]) + } + } + + unsubscribe() { + if (!this.subscription) + return; + let s = this.database.changes.get(this.subscription.key); + s.delete(this.onChange); + if (s.size <= 0) + this.database.changes.delete(this.subscription.key); + + this.subscription = undefined; + } + + + public async collections() { return new Promise((yes, no) => { let keys = []; const stream = this.database.data.createKeyStream({ keyAsBuffer: false }) @@ -197,12 +533,14 @@ export default class Query { throw new Error("There can be no document defined on this operation"); } + //TODO: Lock whole collection! + let batch = this.database.data.batch(); try { if (collection) { - let keys = await this.keys(); - for (let key in keys) { - batch.del(key); + let documents = await this.keys(); + for (let document in documents) { + batch.del(this.getKey(collection, document)); } await batch.write(); batch = undefined; @@ -214,80 +552,7 @@ export default class Query { } } - - - private subscription: { - key: string; - type: Set; - send: (data: Omit) => void; - }; - async subscribe(send: (data: Omit) => void, type: ChangeTypes[] | undefined, _options: Partial) { - let options: ISubscribeOptions = { - existing: true, - ..._options - } - - let { collection, document } = await this.resolve(this.path); - - let key = this.getKey(collection, document); - let s = this.database.changes.get(key) || new Set(); - s.add(this.onChange) - this.database.changes.set(key, s); - - this.subscription = { - key, - send, - type: new Set(type || []) - }; - - Logging.debug("Existing?", options.existing) - - if (options.existing) { - if (document) { - send({ - document: document, - type: "added", - data: await this.get(), - }) - } else { - await this.keys().then(async documents => { - for (let document of documents) { - const data = await this.getDoc(collection, document); - if (data) - send({ - type: "added", - document, - data - }) - } - }) - } - } - - Logging.debug("Subscribed"); - } - - async onChange(change: Change) { - const { type, send } = this.subscription; - - // Events from the sender are handled locally - if (change.sender !== this.sender && type.has(change.type)) { - let c = { ...change }; - delete c.sender; - send(c) - } - } - - unsubscribe() { - if (!this.subscription) { - const { key } = this.subscription; - let s = this.database.changes.get(key); - if (s) { - s.delete(this.onChange); - if (s.size <= 0) - this.database.changes.delete(key); - } - this.subscription = undefined; - } + public static fromQuery(query: Query) { + return new CollectionQuery(...Query.getConstructorParams(query)); } } \ No newline at end of file