Switching to new Query mechanism

This commit is contained in:
Fabian Stamm 2019-11-14 17:28:07 +01:00
parent 72e5c0dedd
commit 0175822699
4 changed files with 132 additions and 135 deletions

View File

@ -1,6 +1,6 @@
import * as WebSocket from "ws"; import * as WebSocket from "ws";
import { Server, IncomingMessage } from "http"; import { Server, IncomingMessage } from "http";
import { DatabaseManager } from "./database/database"; import { DatabaseManager, IQuery, ITypedQuery } from "./database/database";
import Logging from "@hibas123/nodelogging"; import Logging from "@hibas123/nodelogging";
import { Query, CollectionQuery, DocumentQuery } from "./database/query"; import { Query, CollectionQuery, DocumentQuery } from "./database/query";
import Session from "./database/session"; import Session from "./database/session";
@ -36,7 +36,7 @@ function DeleteQuery(result?: any) {
import { URLSearchParams } from "url"; import { URLSearchParams } from "url";
type QueryTypes = "keys" | "get" | "set" | "update" | "delete" | "push" | "subscribe" | "unsubscribe"; // type QueryTypes = "keys" | "get" | "set" | "update" | "delete" | "push" | "subscribe" | "unsubscribe";
export class ConnectionManager { export class ConnectionManager {
static server: WebSocket.Server; static server: WebSocket.Server;
@ -53,7 +53,7 @@ export class ConnectionManager {
const session = new Session(nanoid()); const session = new Session(nanoid());
let query = new URLSearchParams(req.url.split("?").pop()); const query = new URL(req.url).searchParams;
const database = query.get("database"); const database = query.get("database");
const db = DatabaseManager.getDatabase(database); const db = DatabaseManager.getDatabase(database);
@ -98,47 +98,29 @@ export class ConnectionManager {
} }
const handler = new Map<string, ((data: any) => void)>(); const handler = new Map<string, ((data: any) => void)>();
type QueryData = { id: string, type: QueryTypes, path: string[], data: any, options: any };
handler.set("query", async ({ id, type, path, data }: QueryData) => { handler.set("v2", async ({ id, query }: { id: string, query: IQuery }) => db.run(query, session)
//TODO: Handle case with no id, type, path .then(res => answer(id, res))
Logging.debug(`Request with id '${id}' from type '${type}' and path '${path.join("/")}' with data`, data) .catch(err => answer(id, undefined, err)));
try {
if (!db)
throw new Error("Database not found!");
else {
let isDoc = path.length % 2 === 0;
let handler = isDoc ? documentHandler.get(type) : collectionHandler.get(type);
if (!handler && session.root) const SnapshotMap = new Map<string, string>();
handler = rootHandler.get(type); handler.set("snapshot", async ({ id, query }: { id: string, query: ITypedQuery<"snapshot"> }) => {
db.snapshot(query, session, (data => {
if (!handler) socket.send(JSON.stringify({
throw new Error("Invalid Request!"); ns: "snapshot", data: { id, data }
}));
let query = db.getQuery(path || [], session, isDoc ? "document" : "collection"); })).then(s => {
let res = await handler({ answer(id, s.snaphot);
id, SnapshotMap.set(id, s.id);
data, }).catch(err => answer(id, undefined, err));
socket,
query: query as any // We know it is the right one
}) })
if (res && typeof res === "object" && res[StoreSym] !== undefined) { handler.set("unsubscribe", async ({ id }) => {
if (res[StoreSym]) let i = SnapshotMap.get(id);
stored.set(id, query); if (i) {
else db.unsubscribe(i, session);
stored.delete(id); SnapshotMap.delete(i);
res = res.result;
}
answer(id, res);
}
} catch (err) {
// Logging.error(err);
Logging.debug("Sending error:", err);
answer(id, err.message, true);
} }
}) })
@ -164,94 +146,3 @@ export class ConnectionManager {
}) })
} }
} }
type QueryHandler<T extends Query> = (api: {
id: string;
query: T;
// storedQuery(id: string): T | undefined;
socket: WebSocket;
data: any;
}) => any;
const NoPermissionError = new Error("No permisison!");
const rootHandler = new Map<string, QueryHandler<Query>>();
rootHandler.set("collections", ({ query }) => {
return CollectionQuery.fromQuery(query).collections();
})
rootHandler.set("delete-collection", ({ query }) => {
return CollectionQuery.fromQuery(query).deleteCollection();
})
const documentHandler = new Map<string, QueryHandler<DocumentQuery>>();
documentHandler.set("get", ({ query }) => {
return query.get();
})
documentHandler.set("set", ({ query, data }) => {
return query.set(data, {});
})
documentHandler.set("update", ({ query, data }) => {
return query.update(data);
})
documentHandler.set("delete", ({ query }) => {
return query.delete();
})
documentHandler.set("snapshot", async ({ query, data, id, socket }) => {
let res = await query.snapshot((data) => {
socket.send(JSON.stringify({
ns: "snapshot", data: { id, data }
}));
});
return StoreQuery(res);
})
documentHandler.set("unsubscribe", async ({ query }) => {
query.unsubscribe();
return DeleteQuery(true);
})
const collectionHandler = new Map<string, QueryHandler<CollectionQuery>>();
collectionHandler.set("keys", ({ query }) => {
return query.keys();
})
collectionHandler.set("add", ({ query, data }) => {
return query.add(data);
})
collectionHandler.set("get", ({ query, data }) => {
if (data.where)
query.where = data.where;
if (data.limit)
query.limit = data.limit;
return query.get();
})
collectionHandler.set("snapshot", async ({ query, id, socket, data }) => {
if (data.where)
query.where = data.where;
if (data.limit)
query.limit = data.limit;
let res = await query.snapshot((data) => {
socket.send(JSON.stringify({
ns: "snapshot", data: { id, data }
}));
});
return StoreQuery(res);
})
collectionHandler.set("unsubscribe", async ({ query }) => {
query.unsubscribe();
return DeleteQuery(true);
})

View File

@ -5,6 +5,25 @@ import DocumentLock from "./lock";
import { DocumentQuery, CollectionQuery, Query } from "./query"; import { DocumentQuery, CollectionQuery, Query } from "./query";
import Logging from "@hibas123/nodelogging"; import Logging from "@hibas123/nodelogging";
import Session from "./session"; import Session from "./session";
import nanoid = require("nanoid");
type IWriteQueries = "set" | "update" | "delete" | "add";
type ICollectionQueries = "get" | "add" | "keys" | "delete-collection" | "list";
type IDocumentQueries = "get" | "set" | "update" | "delete";
export interface ITypedQuery<T> {
path: string[];
type: T;
data?: any;
options?: any;
}
interface ITransaction {
queries: ITypedQuery<IWriteQueries>[];
}
export type IQuery = ITypedQuery<ICollectionQueries | IDocumentQueries>;
export class DatabaseManager { export class DatabaseManager {
static databases = new Map<string, Database>(); static databases = new Map<string, Database>();
@ -115,6 +134,82 @@ export class Database {
return new Query(this, path, session); return new Query(this, path, session);
} }
async run(query: IQuery, session: Session) {
const isCollection = query.path.length % 2 === 1;
if (isCollection) {
const q = new CollectionQuery(this, query.path, session);
let type = query.type as ICollectionQueries;
switch (type) {
case "add":
return q.add(query.data);
case "get":
const limit = (query.options || {}).limit;
if (limit)
q.limit = limit;
const where = (query.options || {}).where;
if (where)
q.where = where;
return q.get();
case "keys":
return q.keys();
case "list":
return q.collections();
case "delete-collection":
return q.deleteCollection();
default:
return Promise.reject(new Error("Invalid query!"));
}
} else {
const q = new DocumentQuery(this, query.path, session);
let type = query.type as IDocumentQueries;
switch (type) {
case "get":
return q.get();
case "set":
return q.set(query.data, query.options || {});
case "update":
return q.update(query.data);
case "delete":
return q.delete();
default:
return Promise.reject(new Error("Invalid query!"));
}
}
}
async snapshot(query: ITypedQuery<"snapshot">, session: Session, onchange: (change: any) => void) {
const isCollection = query.path.length % 2 === 1;
let q: DocumentQuery | CollectionQuery;
if (isCollection) {
q = new CollectionQuery(this, query.path, session);
const limit = (query.options || {}).limit;
if (limit)
q.limit = limit;
const where = (query.options || {}).where;
if (where)
q.where = where;
} else {
q = new DocumentQuery(this, query.path, session);
}
const id = nanoid(16);
session.queries.set(id, q);
return {
id,
snaphot: await q.snapshot(onchange)
};
}
async unsubscribe(id: string, session: Session) {
let query: CollectionQuery | DocumentQuery = session.queries.get(id) as any;
if (query) {
query.unsubscribe();
session.queries.delete(id);
}
}
async stop() { async stop() {
await this.data.close(); await this.data.close();
} }

View File

@ -379,6 +379,7 @@ export class CollectionQuery extends Query {
if (this.where.length > 0) { if (this.where.length > 0) {
return this.where.every(where => { return this.where.every(where => {
let val = this.getFieldValue(data, where.fieldPath); let val = this.getFieldValue(data, where.fieldPath);
Logging.debug("Value:", val);
switch (where.opStr) { switch (where.opStr) {
case "<": case "<":
return val < where.value; return val < where.value;
@ -395,10 +396,13 @@ export class CollectionQuery extends Query {
return val.some(e => e === where.value); return val.some(e => e === where.value);
} }
break; return false;
// case "array-contains-any": // case "array-contains-any":
// case "in": case "in":
if (typeof val === "object") {
return where.value in val;
}
return false;
default: default:
throw new Error("Invalid where operation " + where.opStr); throw new Error("Invalid where operation " + where.opStr);
} }
@ -459,6 +463,9 @@ export class CollectionQuery extends Query {
} }
stream.next(onValue) stream.next(onValue)
}).then(val => {
Logging.debug("Get returns:", val, ((this.where || [])[0] || {}));
return val;
}) })
} }

View File

@ -1,3 +1,5 @@
import { Query } from "./query";
export default class Session { export default class Session {
constructor(private _sessionid: string) { } constructor(private _sessionid: string) { }
get id() { get id() {
@ -5,4 +7,6 @@ export default class Session {
} }
root: boolean = false; root: boolean = false;
uid: string = undefined; uid: string = undefined;
queries = new Map<string, Query>();
} }