A lot of changes

This commit is contained in:
Fabian Stamm 2019-11-06 23:27:29 +01:00
parent d8c55d4389
commit 3fa209c0cf
5 changed files with 524 additions and 221 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@hibas123/realtimedb",
"version": "2.0.0-beta.1",
"version": "2.0.0-beta.2",
"description": "",
"main": "lib/index.js",
"private": true,

View File

@ -2,7 +2,7 @@ import * as WebSocket from "ws";
import { Server, IncomingMessage } from "http";
import { DatabaseManager } from "./database/database";
import Logging from "@hibas123/logging";
import Query from "./database/query";
import { Query, CollectionQuery, DocumentQuery } from "./database/query";
import Session from "./database/session";
import nanoid = require("nanoid");
@ -100,100 +100,40 @@ export class ConnectionManager {
const handler = new Map<string, ((data: any) => void)>();
type QueryData = { id: string, type: QueryTypes, path: string[], data: any, options: any };
const queryHandler = new Map<string, (query: Query, permissions: { read: boolean, write: boolean }, data?: any) => Promise<any>>();
const noperm = new Error("No permisison!");
if (session.root) {
queryHandler.set("collections", (query, perm, data) => {
return query.getCollections();
})
queryHandler.set("delete-collection", (query, perm, data) => {
return query.deleteCollection();
})
}
queryHandler.set("keys", (query, perm, data) => {
if (!perm.read)
throw noperm;
return query.keys();
})
queryHandler.set("get", (query, perm, data) => {
if (!perm.read)
throw noperm;
return query.get();
})
queryHandler.set("set", (query, perm, data) => {
if (!perm.write)
throw noperm;
return query.set(data, {});
})
queryHandler.set("update", (query, perm, data) => {
if (!perm.write)
throw noperm;
return query.update(data);
})
queryHandler.set("push", (query, perm, data) => {
if (!perm.write)
throw noperm;
return query.push(data);
})
queryHandler.set("delete", (query, perm, data) => {
if (!perm.write)
throw noperm;
return query.delete();
})
queryHandler.set("subscribe", async (query, perm, data) => {
if (!perm.read)
throw noperm;
let subscriptionID = nanoid();
query.subscribe((data) => {
socket.send(JSON.stringify({ ns: "event", data: { id: subscriptionID, data } }));
}, data.types, data.options);
return StoreQuery(subscriptionID);
})
queryHandler.set("unsubscribe", async (query, perm, data) => {
query.unsubscribe();
return DeleteQuery(true);
})
handler.set("query", async ({ id, type, path, data }: QueryData) => {
//TODO: Handle case with no id, type, path
Logging.debug(`Request with id '${id}' from type '${type}' and path '${path}' with data`, data)
Logging.debug(`Request with id '${id}' from type '${type}' and path '${path.join("/")}' with data`, data)
try {
if (!db)
throw new Error("Database not found!");
else {
let handler = queryHandler.get(type);
if (!handler) {
let isDoc = path.length % 2 === 0;
let handler = isDoc ? documentHandler.get(type) : collectionHandler.get(type);
if (!handler && session.root)
handler = rootHandler.get(type);
if (!handler)
throw new Error("Invalid Request!");
} else {
const query = stored.get(id) || db.getQuery(path || [], session.sessionid);
const perms = db.rules.hasPermission(path, session);
let res = await handler(query, perms, data);
if (res && typeof res === "object" && res[StoreSym] !== undefined) {
if (res[StoreSym])
stored.set(id, query);
else
stored.delete(id);
let query = db.getQuery(path || [], session.sessionid, isDoc ? "document" : "collection");
let res = await handler({
id,
data,
socket,
query: query as any // We know it is the right one
})
res = res.result;
}
answer(id, res);
if (res && typeof res === "object" && res[StoreSym] !== undefined) {
if (res[StoreSym])
stored.set(id, query);
else
stored.delete(id);
res = res.result;
}
answer(id, res);
}
} catch (err) {
// Logging.error(err);
@ -215,10 +155,103 @@ export class ConnectionManager {
}
})
socket.on("disconnect", () => {
stored.forEach(query => query.unsubscribe());
socket.on("close", () => {
Logging.log(`${session.sessionid} has disconnected!`);
Logging.debug("Clearing stored:", stored);
stored.forEach(query => (query as DocumentQuery | CollectionQuery).unsubscribe());
stored.clear();
socket.removeAllListeners();
})
}
}
type QueryHandler<T extends Query> = (api: {
id: string;
query: T;
// storedQuery(id: string): T | undefined;
socket: WebSocket;
data: any;
}) => any;
const NoPermissionError = new Error("No permisison!");
const rootHandler = new Map<string, QueryHandler<Query>>();
rootHandler.set("collections", ({ query }) => {
return CollectionQuery.fromQuery(query).collections();
})
rootHandler.set("delete-collection", ({ query }) => {
return CollectionQuery.fromQuery(query).deleteCollection();
})
const documentHandler = new Map<string, QueryHandler<DocumentQuery>>();
documentHandler.set("get", ({ query }) => {
return query.get();
})
documentHandler.set("set", ({ query, data }) => {
return query.set(data, {});
})
documentHandler.set("update", ({ query, data }) => {
return query.update(data);
})
documentHandler.set("delete", ({ query }) => {
return query.delete();
})
documentHandler.set("snapshot", async ({ query, data, id, socket }) => {
let res = await query.snapshot((data) => {
socket.send(JSON.stringify({
ns: "snapshot", data: { id, data }
}));
});
return StoreQuery(res);
})
documentHandler.set("unsubscribe", async ({ query }) => {
query.unsubscribe();
return DeleteQuery(true);
})
const collectionHandler = new Map<string, QueryHandler<CollectionQuery>>();
collectionHandler.set("keys", ({ query }) => {
return query.keys();
})
collectionHandler.set("add", ({ query, data }) => {
return query.add(data);
})
collectionHandler.set("get", ({ query, data }) => {
if (data.where)
query.where = data.where;
if (data.limit)
query.limit = data.limit;
return query.get();
})
collectionHandler.set("snapshot", async ({ query, id, socket, data }) => {
if (data.where)
query.where = data.where;
if (data.limit)
query.limit = data.limit;
let res = await query.snapshot((data) => {
socket.send(JSON.stringify({
ns: "snapshot", data: { id, data }
}));
});
return StoreQuery(res);
})
collectionHandler.set("unsubscribe", async ({ query }) => {
query.unsubscribe();
return DeleteQuery(true);
})

View File

@ -2,7 +2,7 @@ import { Rules } from "./rules";
import Settings from "../settings";
import getLevelDB, { LevelDB, deleteLevelDB } from "../storage";
import DocumentLock from "./lock";
import Query from "./query";
import { DocumentQuery, CollectionQuery, Query } from "./query";
export class DatabaseManager {
static databases = new Map<string, Database>();
@ -104,8 +104,13 @@ export class Database {
}
getQuery(path: string[], sender: string) {
return new Query(this, path, sender);
getQuery(path: string[], sender: string, type: "document" | "collection" | "any") {
if (type === "document")
return new DocumentQuery(this, path, sender);
else if (type === "collection")
return new CollectionQuery(this, path, sender);
else
return new Query(this, path, sender);
}
async stop() {

View File

@ -19,7 +19,7 @@ export default class DocumentLock {
return () => {
if (l.length > 0)
setImmediate(() => l.shift());
setImmediate(() => l.shift()());
else
this.locks.delete(key)
}

View File

@ -14,7 +14,7 @@ interface ISubscribeOptions {
existing: boolean;
}
export default class Query {
export class Query {
/**
* Returns true if the path only contains valid characters and false if it doesn't
* @param path Path to be checked
@ -23,19 +23,17 @@ export default class Query {
return path.every(e => (e.match(/[^a-zA-Z0-9_\-\<\>]/g) || []).length === 0);
}
constructor(private database: Database, private path: string[], private sender: string) {
constructor(protected database: Database, protected path: string[], protected sender: string) {
if (path.length > 10) {
throw new Error("Path is to long. Path is only allowed to be 10 Layers deep!");
}
if (!this.validatePath(path)) {
throw new Error("Path can only contain a-z A-Z 0-9 '-' '-' '<' and '>' ");
}
this.onChange = this.onChange.bind(this);
}
private async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> {
protected async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> {
path = [...path]; // Create modifiable copy
let collectionID: string = undefined;
let documentKey = path.length % 2 === 0 ? path.pop() : undefined;
@ -60,15 +58,17 @@ export default class Query {
};
}
private getKey(collection: string, document?: string) {
protected getKey(collection: string, document?: string) {
return `${collection || ""}/${document || ""}`;
}
private getDoc(collection: string, document: string) {
return this.database.data.get(this.getKey(collection, document), { asBuffer: true }).then(res => decode(res as Buffer)).catch(resNull);
protected getDoc(collection: string, document: string) {
return this.database.data
.get(this.getKey(collection, document), { asBuffer: true })
.then(res => decode<any>(res as Buffer)).catch(resNull);
}
private sendChange(collection: string, document: string, type: ChangeTypes, data: any) {
protected sendChange(collection: string, document: string, type: ChangeTypes, data: any) {
let change: Change = {
type,
document,
@ -77,6 +77,7 @@ export default class Query {
}
let s = this.database.changes.get(this.getKey(collection, document))
if (s)
s.forEach(e => setImmediate(() => e(change)))
s = this.database.changes.get(this.getKey(collection))
@ -85,46 +86,31 @@ export default class Query {
}
protected static getConstructorParams(query: Query): [Database, string[], string] {
return [query.database, query.path, query.sender];
}
}
interface UpdateData {
[path: string]: {
type: "value" | "timestamp" | "increment" | "push";
value: any;
}
}
export class DocumentQuery extends Query {
constructor(database: Database, path: string[], sender: string) {
super(database, path, sender);
this.onChange = this.onChange.bind(this);
}
public async get() {
let { collection, document } = await this.resolve(this.path);
if (!collection || !document) {
return null;
}
const lock = await this.database.locks.lock(collection, document);
return this.getDoc(collection, document).finally(() => lock())
}
public async keys() {
let { collection, document } = await this.resolve(this.path);
if (document)
throw new Error("Keys only works on collections!");
if (!collection)
throw new Error("There must be a collection");
let gt = Buffer.from(this.getKey(collection) + " ");
gt[gt.length - 1] = 0;
let lt = Buffer.alloc(gt.length);
lt.set(gt);
lt[gt.length - 1] = 0xFF;
return new Promise<string[]>((yes, no) => {
let keys = [];
const stream = this.database.data.createKeyStream({
gt,
lt,
keyAsBuffer: false
})
stream.on("data", (key: string) => {
let s = key.split("/", 2);
if (s.length > 1)
keys.push(s[1]);
});
stream.on("end", () => yes(keys));
stream.on("error", no);
});
return this.getDoc(collection, document)
}
public async set(data: any, { merge = false }) {
@ -149,11 +135,79 @@ export default class Query {
.finally(() => lock())
}
public async push(value: any) {
let id = nanoid(ALPHABET, 32);
let q = new Query(this.database, [...this.path, id], this.sender);
await q.set(value, {});
return id;
public async update(updateData: UpdateData) {
let { collection, document } = await this.resolve(this.path, true);
if (!collection) {
throw new Error("There must be a collection!")
}
if (!document) {
throw new Error("There must be a document key!")
}
// Logging.debug(updateData);
const lock = await this.database.locks.lock(collection, document);
try {
let data = await this.getDoc(collection, document);
let isNew = false
if (!data) {
isNew = true;
data = {};
}
for (let path in updateData) {
const toUpdate = updateData[path];
let d = data;
let parts = path.split(".");
while (parts.length > 1) {
let seg = parts.shift();
if (!data[seg])
data[seg] = {}
d = data[seg];
}
const last = parts[0];
// Logging.debug(parts, last, d)
switch (toUpdate.type) {
case "value":
d[last] = toUpdate.value;
break;
case "increment":
if (d[last] === undefined || d[last] === null)
d[last] = toUpdate.value;
else if (typeof d[last] !== "number") {
throw new Error("Field is no number!");
} else {
d[last] += toUpdate.value;
}
break;
case "timestamp":
d[last] = new Date().valueOf();
break;
case "push":
if (d[last] === undefined || d[last] === null)
d[last] = [toUpdate.value];
else if (Array.isArray(d[last])) {
d[last].push(toUpdate.value);
} else {
throw new Error("Field is not array!");
}
break;
default:
throw new Error("Invalid update type: " + toUpdate.type);
}
}
this.database.data
.put(this.getKey(collection, document), encode(data))
.then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data))
} finally {
lock();
}
//TODO: Implement
}
public async delete() {
@ -167,7 +221,6 @@ export default class Query {
throw new Error("There must be a document key!")
}
const lock = await this.database.locks.lock(collection, document);
return await this.database.data
@ -176,11 +229,294 @@ export default class Query {
.finally(() => lock())
}
public async update(data: any) {
//TODO: Implement
private subscription: {
key: string,
onChange: (change: DocRes & { type: ChangeTypes }) => void
};
async snapshot(onChange: (change: DocRes & { type: ChangeTypes }) => void) {
if (this.subscription)
throw new Error("This query is already subscribed!");
let { collection, document } = await this.resolve(this.path);
let data = await this.getDoc(collection, document);
let key = this.getKey(collection, document);
this.subscription = {
key,
onChange
}
let s = this.database.changes.get(key);
if (!s) {
s = new Set();
this.database.changes.set(key, s);
}
s.add(this.onChange);
return data;
}
public async getCollections() {
onChange(change: Change) {
// if(change.sender === this.sender)
// return
this.subscription.onChange({
id: change.document,
data: change.data,
type: change.type
})
}
unsubscribe() {
if (!this.subscription)
return;
let s = this.database.changes.get(this.subscription.key);
s.delete(this.onChange);
if (s.size <= 0)
this.database.changes.delete(this.subscription.key);
this.subscription = undefined;
}
public static fromQuery(query: Query) {
return new DocumentQuery(...Query.getConstructorParams(query));
}
}
type FieldPath = string;
type WhereFilterOp =
| '<'
| '<='
| '=='
| '>='
| '>'
| 'array-contains'
| 'in'
| 'array-contains-any';
interface IQueryWhere {
fieldPath: FieldPath,
opStr: WhereFilterOp,
value: any
}
interface DocRes {
id: string;
data: any;
}
export class CollectionQuery extends Query {
constructor(database: Database, path: string[], sender: string) {
super(database, path, sender);
this.onChange = this.onChange.bind(this);
}
public where: IQueryWhere[] = [];
public limit: number = -1;
public async add(value: any) {
let id = nanoid(ALPHABET, 32);
let q = new DocumentQuery(this.database, [...this.path, id], this.sender);
await q.set(value, {});
return id;
}
private getStreamOptions(collection: string) {
let gt = Buffer.from(this.getKey(collection) + " ");
gt[gt.length - 1] = 0;
let lt = Buffer.alloc(gt.length);
lt.set(gt);
lt[gt.length - 1] = 0xFF;
return {
gt,
lt
}
}
public async keys() {
let { collection, document } = await this.resolve(this.path);
if (document)
throw new Error("Keys only works on collections!");
if (!collection)
throw new Error("There must be a collection");
return new Promise<string[]>((yes, no) => {
let keys = [];
const stream = this.database.data.createKeyStream({
...this.getStreamOptions(collection),
keyAsBuffer: false
})
stream.on("data", (key: string) => {
let s = key.split("/", 2);
if (s.length > 1)
keys.push(s[1]);
});
stream.on("end", () => yes(keys));
stream.on("error", no);
});
}
private getFieldValue(data: any, path: FieldPath) {
let parts = path.split(".");
let d = data;
while (parts.length > 0) {
let seg = parts.shift();
d = data[seg];
if (d === undefined || d === null)
break; // Undefined/Null has no other fields!
}
return d;
}
private fitsWhere(data: any): boolean {
if (this.where.length > 0) {
return this.where.every(where => {
let val = this.getFieldValue(data, where.fieldPath);
switch (where.opStr) {
case "<":
return val < where.value;
case "<=":
return val <= where.value;
case "==":
return val == where.value;
case ">=":
return val >= where.value;
case ">":
return val > where.value;
case "array-contains":
if (Array.isArray(val)) {
return val.some(e => e === where.value);
}
break;
// case "array-contains-any":
// case "in":
default:
throw new Error("Invalid where operation " + where.opStr);
}
})
}
return true;
}
async get() {
let { collection, document } = await this.resolve(this.path);
if (document)
throw new Error("Keys only works on collections!");
if (!collection)
throw new Error("There must be a collection");
return new Promise<DocRes[]>((yes, no) => {
const stream = this.database.data.iterator({
...this.getStreamOptions(collection),
keyAsBuffer: false,
valueAsBuffer: true
})
let values: DocRes[] = [];
const onValue = (err: Error, key: string, value: Buffer) => {
if (err) {
no(err);
stream.end(err => Logging.error(err));
}
else {
if (!key && !value) {
// END
Logging.debug("Checked all!")
yes(values);
} else {
let s = key.split("/", 2);
if (s.length <= 1)
return;
const id = s[1];
let data = decode(value);
if (this.fitsWhere(data)) {
Logging.debug("Found fitting")
if (this.limit < 0 || value.length < this.limit) {
values.push({
id,
data
});
}
else {
stream.end((err) => err ? no(err) : yes(values))
return;
}
}
stream.next(onValue);
}
}
}
stream.next(onValue)
})
}
private subscription: {
key: string,
onChange: (change: (DocRes & { type: ChangeTypes })[]) => void
};
async snapshot(onChange: (change: (DocRes & { type: ChangeTypes })[]) => void) {
if (this.subscription)
throw new Error("This query is already subscribed!");
let { collection, document } = await this.resolve(this.path);
let data = await this.get();
let key = this.getKey(collection, document);
this.subscription = {
key,
onChange
}
let s = this.database.changes.get(key);
if (!s) {
s = new Set();
this.database.changes.set(key, s);
}
s.add(this.onChange);
return data;
}
onChange(change: Change) {
// if(change.sender === this.sender)
// return
if (this.fitsWhere(change.data)) {
this.subscription.onChange([{
id: change.document,
data: change.data,
type: change.type
}])
}
}
unsubscribe() {
if (!this.subscription)
return;
let s = this.database.changes.get(this.subscription.key);
s.delete(this.onChange);
if (s.size <= 0)
this.database.changes.delete(this.subscription.key);
this.subscription = undefined;
}
public async collections() {
return new Promise<string[]>((yes, no) => {
let keys = [];
const stream = this.database.data.createKeyStream({ keyAsBuffer: false })
@ -197,12 +533,14 @@ export default class Query {
throw new Error("There can be no document defined on this operation");
}
//TODO: Lock whole collection!
let batch = this.database.data.batch();
try {
if (collection) {
let keys = await this.keys();
for (let key in keys) {
batch.del(key);
let documents = await this.keys();
for (let document in documents) {
batch.del(this.getKey(collection, document));
}
await batch.write();
batch = undefined;
@ -214,80 +552,7 @@ export default class Query {
}
}
private subscription: {
key: string;
type: Set<ChangeTypes>;
send: (data: Omit<Change, "sender">) => void;
};
async subscribe(send: (data: Omit<Change, "sender">) => void, type: ChangeTypes[] | undefined, _options: Partial<ISubscribeOptions>) {
let options: ISubscribeOptions = {
existing: true,
..._options
}
let { collection, document } = await this.resolve(this.path);
let key = this.getKey(collection, document);
let s = this.database.changes.get(key) || new Set();
s.add(this.onChange)
this.database.changes.set(key, s);
this.subscription = {
key,
send,
type: new Set(type || [])
};
Logging.debug("Existing?", options.existing)
if (options.existing) {
if (document) {
send({
document: document,
type: "added",
data: await this.get(),
})
} else {
await this.keys().then(async documents => {
for (let document of documents) {
const data = await this.getDoc(collection, document);
if (data)
send({
type: "added",
document,
data
})
}
})
}
}
Logging.debug("Subscribed");
}
async onChange(change: Change) {
const { type, send } = this.subscription;
// Events from the sender are handled locally
if (change.sender !== this.sender && type.has(change.type)) {
let c = { ...change };
delete c.sender;
send(c)
}
}
unsubscribe() {
if (!this.subscription) {
const { key } = this.subscription;
let s = this.database.changes.get(key);
if (s) {
s.delete(this.onChange);
if (s.size <= 0)
this.database.changes.delete(key);
}
this.subscription = undefined;
}
public static fromQuery(query: Query) {
return new CollectionQuery(...Query.getConstructorParams(query));
}
}