From 01758226991c75edea50897a7931bf57915c52b2 Mon Sep 17 00:00:00 2001 From: Fabian Stamm Date: Thu, 14 Nov 2019 17:28:07 +0100 Subject: [PATCH] Switching to new Query mechanism --- src/connection.ts | 155 ++++++--------------------------------- src/database/database.ts | 95 ++++++++++++++++++++++++ src/database/query.ts | 13 +++- src/database/session.ts | 4 + 4 files changed, 132 insertions(+), 135 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 2a188a4..12e2854 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1,6 +1,6 @@ import * as WebSocket from "ws"; import { Server, IncomingMessage } from "http"; -import { DatabaseManager } from "./database/database"; +import { DatabaseManager, IQuery, ITypedQuery } from "./database/database"; import Logging from "@hibas123/nodelogging"; import { Query, CollectionQuery, DocumentQuery } from "./database/query"; import Session from "./database/session"; @@ -36,7 +36,7 @@ function DeleteQuery(result?: any) { import { URLSearchParams } from "url"; -type QueryTypes = "keys" | "get" | "set" | "update" | "delete" | "push" | "subscribe" | "unsubscribe"; +// type QueryTypes = "keys" | "get" | "set" | "update" | "delete" | "push" | "subscribe" | "unsubscribe"; export class ConnectionManager { static server: WebSocket.Server; @@ -53,7 +53,7 @@ export class ConnectionManager { const session = new Session(nanoid()); - let query = new URLSearchParams(req.url.split("?").pop()); + const query = new URL(req.url).searchParams; const database = query.get("database"); const db = DatabaseManager.getDatabase(database); @@ -98,47 +98,29 @@ export class ConnectionManager { } const handler = new Map void)>(); - type QueryData = { id: string, type: QueryTypes, path: string[], data: any, options: any }; - handler.set("query", async ({ id, type, path, data }: QueryData) => { - //TODO: Handle case with no id, type, path - Logging.debug(`Request with id '${id}' from type '${type}' and path '${path.join("/")}' with data`, data) + handler.set("v2", async ({ id, query }: { id: string, query: IQuery }) => db.run(query, session) + .then(res => answer(id, res)) + .catch(err => answer(id, undefined, err))); - try { - if (!db) - throw new Error("Database not found!"); - else { - let isDoc = path.length % 2 === 0; - let handler = isDoc ? documentHandler.get(type) : collectionHandler.get(type); - if (!handler && session.root) - handler = rootHandler.get(type); + const SnapshotMap = new Map(); + handler.set("snapshot", async ({ id, query }: { id: string, query: ITypedQuery<"snapshot"> }) => { + db.snapshot(query, session, (data => { + socket.send(JSON.stringify({ + ns: "snapshot", data: { id, data } + })); + })).then(s => { + answer(id, s.snaphot); + SnapshotMap.set(id, s.id); + }).catch(err => answer(id, undefined, err)); + }) - if (!handler) - throw new Error("Invalid Request!"); - - let query = db.getQuery(path || [], session, isDoc ? "document" : "collection"); - let res = await handler({ - id, - data, - socket, - query: query as any // We know it is the right one - }) - - if (res && typeof res === "object" && res[StoreSym] !== undefined) { - if (res[StoreSym]) - stored.set(id, query); - else - stored.delete(id); - - res = res.result; - } - answer(id, res); - } - } catch (err) { - // Logging.error(err); - Logging.debug("Sending error:", err); - answer(id, err.message, true); + handler.set("unsubscribe", async ({ id }) => { + let i = SnapshotMap.get(id); + if (i) { + db.unsubscribe(i, session); + SnapshotMap.delete(i); } }) @@ -163,95 +145,4 @@ export class ConnectionManager { socket.removeAllListeners(); }) } -} - -type QueryHandler = (api: { - id: string; - query: T; - // storedQuery(id: string): T | undefined; - socket: WebSocket; - data: any; -}) => any; - -const NoPermissionError = new Error("No permisison!"); - -const rootHandler = new Map>(); -rootHandler.set("collections", ({ query }) => { - return CollectionQuery.fromQuery(query).collections(); -}) - -rootHandler.set("delete-collection", ({ query }) => { - return CollectionQuery.fromQuery(query).deleteCollection(); -}) - - -const documentHandler = new Map>(); -documentHandler.set("get", ({ query }) => { - return query.get(); -}) - -documentHandler.set("set", ({ query, data }) => { - return query.set(data, {}); -}) - -documentHandler.set("update", ({ query, data }) => { - return query.update(data); -}) - -documentHandler.set("delete", ({ query }) => { - return query.delete(); -}) - -documentHandler.set("snapshot", async ({ query, data, id, socket }) => { - let res = await query.snapshot((data) => { - socket.send(JSON.stringify({ - ns: "snapshot", data: { id, data } - })); - }); - - return StoreQuery(res); -}) - -documentHandler.set("unsubscribe", async ({ query }) => { - query.unsubscribe(); - return DeleteQuery(true); -}) - - -const collectionHandler = new Map>(); -collectionHandler.set("keys", ({ query }) => { - return query.keys(); -}) - - -collectionHandler.set("add", ({ query, data }) => { - return query.add(data); -}) - -collectionHandler.set("get", ({ query, data }) => { - if (data.where) - query.where = data.where; - if (data.limit) - query.limit = data.limit; - return query.get(); -}) - -collectionHandler.set("snapshot", async ({ query, id, socket, data }) => { - if (data.where) - query.where = data.where; - if (data.limit) - query.limit = data.limit; - - let res = await query.snapshot((data) => { - socket.send(JSON.stringify({ - ns: "snapshot", data: { id, data } - })); - }); - - return StoreQuery(res); -}) - -collectionHandler.set("unsubscribe", async ({ query }) => { - query.unsubscribe(); - return DeleteQuery(true); -}) \ No newline at end of file +} \ No newline at end of file diff --git a/src/database/database.ts b/src/database/database.ts index 5e27087..fb65f3a 100644 --- a/src/database/database.ts +++ b/src/database/database.ts @@ -5,6 +5,25 @@ import DocumentLock from "./lock"; import { DocumentQuery, CollectionQuery, Query } from "./query"; import Logging from "@hibas123/nodelogging"; import Session from "./session"; +import nanoid = require("nanoid"); + +type IWriteQueries = "set" | "update" | "delete" | "add"; +type ICollectionQueries = "get" | "add" | "keys" | "delete-collection" | "list"; +type IDocumentQueries = "get" | "set" | "update" | "delete"; + +export interface ITypedQuery { + path: string[]; + type: T; + data?: any; + options?: any; +} + +interface ITransaction { + queries: ITypedQuery[]; +} + +export type IQuery = ITypedQuery; + export class DatabaseManager { static databases = new Map(); @@ -115,6 +134,82 @@ export class Database { return new Query(this, path, session); } + async run(query: IQuery, session: Session) { + const isCollection = query.path.length % 2 === 1; + if (isCollection) { + const q = new CollectionQuery(this, query.path, session); + let type = query.type as ICollectionQueries; + switch (type) { + case "add": + return q.add(query.data); + case "get": + const limit = (query.options || {}).limit; + if (limit) + q.limit = limit; + const where = (query.options || {}).where; + if (where) + q.where = where; + return q.get(); + case "keys": + return q.keys(); + case "list": + return q.collections(); + case "delete-collection": + return q.deleteCollection(); + default: + return Promise.reject(new Error("Invalid query!")); + } + } else { + const q = new DocumentQuery(this, query.path, session); + let type = query.type as IDocumentQueries; + switch (type) { + case "get": + return q.get(); + case "set": + return q.set(query.data, query.options || {}); + case "update": + return q.update(query.data); + case "delete": + return q.delete(); + default: + return Promise.reject(new Error("Invalid query!")); + } + } + } + + async snapshot(query: ITypedQuery<"snapshot">, session: Session, onchange: (change: any) => void) { + const isCollection = query.path.length % 2 === 1; + let q: DocumentQuery | CollectionQuery; + if (isCollection) { + q = new CollectionQuery(this, query.path, session); + const limit = (query.options || {}).limit; + if (limit) + q.limit = limit; + const where = (query.options || {}).where; + if (where) + q.where = where; + } else { + q = new DocumentQuery(this, query.path, session); + } + + const id = nanoid(16); + session.queries.set(id, q); + return { + id, + snaphot: await q.snapshot(onchange) + }; + } + + async unsubscribe(id: string, session: Session) { + let query: CollectionQuery | DocumentQuery = session.queries.get(id) as any; + if (query) { + query.unsubscribe(); + session.queries.delete(id); + } + } + + + async stop() { await this.data.close(); } diff --git a/src/database/query.ts b/src/database/query.ts index 108fadd..1b71338 100644 --- a/src/database/query.ts +++ b/src/database/query.ts @@ -379,6 +379,7 @@ export class CollectionQuery extends Query { if (this.where.length > 0) { return this.where.every(where => { let val = this.getFieldValue(data, where.fieldPath); + Logging.debug("Value:", val); switch (where.opStr) { case "<": return val < where.value; @@ -395,10 +396,13 @@ export class CollectionQuery extends Query { return val.some(e => e === where.value); } - break; + return false; // case "array-contains-any": - // case "in": - + case "in": + if (typeof val === "object") { + return where.value in val; + } + return false; default: throw new Error("Invalid where operation " + where.opStr); } @@ -459,6 +463,9 @@ export class CollectionQuery extends Query { } stream.next(onValue) + }).then(val => { + Logging.debug("Get returns:", val, ((this.where || [])[0] || {})); + return val; }) } diff --git a/src/database/session.ts b/src/database/session.ts index 2ddb016..92ecb7d 100644 --- a/src/database/session.ts +++ b/src/database/session.ts @@ -1,3 +1,5 @@ +import { Query } from "./query"; + export default class Session { constructor(private _sessionid: string) { } get id() { @@ -5,4 +7,6 @@ export default class Session { } root: boolean = false; uid: string = undefined; + + queries = new Map(); } \ No newline at end of file