This repository has been archived on 2021-06-02. You can view files and clone it, but cannot push or open issues or pull requests.
Files
RealtimeDB-OLD/src/database/query.ts
2019-11-07 00:18:15 +01:00

558 lines
16 KiB
TypeScript

import { Database, Change, ChangeTypes } from "./database";
import { resNull } from "../storage";
import nanoid = require("nanoid/generate");
import Logging from "@hibas123/nodelogging";
import * as MSGPack from "what-the-pack";
export const MP = MSGPack.initialize(2 ** 20);
const ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
const { encode, decode } = MP;
interface ISubscribeOptions {
existing: boolean;
}
export class Query {
/**
* Returns true if the path only contains valid characters and false if it doesn't
* @param path Path to be checked
*/
private validatePath(path: string[]) {
return path.every(e => (e.match(/[^a-zA-Z0-9_\-\<\>]/g) || []).length === 0);
}
constructor(protected database: Database, protected path: string[], protected sender: string) {
if (path.length > 10) {
throw new Error("Path is to long. Path is only allowed to be 10 Layers deep!");
}
if (!this.validatePath(path)) {
throw new Error("Path can only contain a-z A-Z 0-9 '-' '-' '<' and '>' ");
}
}
protected async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> {
path = [...path]; // Create modifiable copy
let collectionID: string = undefined;
let documentKey = path.length % 2 === 0 ? path.pop() : undefined;
let key = path.join("/");
const lock = await this.database.collectionLocks.lock(key);
try {
collectionID = await this.database.collections.get(key).then(r => r.toString()).catch(resNull);
if (!collectionID && create) {
collectionID = nanoid(ALPHABET, 32);
await this.database.collections.put(key, collectionID);
}
} finally {
lock();
}
return {
collection: collectionID,
document: documentKey,
collectionKey: key
};
}
protected getKey(collection: string, document?: string) {
return `${collection || ""}/${document || ""}`;
}
protected getDoc(collection: string, document: string) {
return this.database.data
.get(this.getKey(collection, document), { asBuffer: true })
.then(res => decode<any>(res as Buffer)).catch(resNull);
}
protected sendChange(collection: string, document: string, type: ChangeTypes, data: any) {
let change: Change = {
type,
document,
data,
sender: this.sender
}
let s = this.database.changes.get(this.getKey(collection, document))
if (s)
s.forEach(e => setImmediate(() => e(change)))
s = this.database.changes.get(this.getKey(collection))
if (s)
s.forEach(e => setImmediate(() => e(change)))
}
protected static getConstructorParams(query: Query): [Database, string[], string] {
return [query.database, query.path, query.sender];
}
}
interface UpdateData {
[path: string]: {
type: "value" | "timestamp" | "increment" | "push";
value: any;
}
}
export class DocumentQuery extends Query {
constructor(database: Database, path: string[], sender: string) {
super(database, path, sender);
this.onChange = this.onChange.bind(this);
}
public async get() {
let { collection, document } = await this.resolve(this.path);
if (!collection || !document) {
return null;
}
return this.getDoc(collection, document)
}
public async set(data: any, { merge = false }) {
if (data === null)
return this.delete();
let { collection, document } = await this.resolve(this.path, true);
if (!collection) {
throw new Error("There must be a collection!")
}
if (!document) {
throw new Error("There must be a document key!")
}
const lock = await this.database.locks.lock(collection, document);
let isNew = !(await this.getDoc(collection, document))
return this.database.data
.put(this.getKey(collection, document), encode(data))
.then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data))
.finally(() => lock())
}
public async update(updateData: UpdateData) {
let { collection, document } = await this.resolve(this.path, true);
if (!collection) {
throw new Error("There must be a collection!")
}
if (!document) {
throw new Error("There must be a document key!")
}
// Logging.debug(updateData);
const lock = await this.database.locks.lock(collection, document);
try {
let data = await this.getDoc(collection, document);
let isNew = false
if (!data) {
isNew = true;
data = {};
}
for (let path in updateData) {
const toUpdate = updateData[path];
let d = data;
let parts = path.split(".");
while (parts.length > 1) {
let seg = parts.shift();
if (!data[seg])
data[seg] = {}
d = data[seg];
}
const last = parts[0];
// Logging.debug(parts, last, d)
switch (toUpdate.type) {
case "value":
d[last] = toUpdate.value;
break;
case "increment":
if (d[last] === undefined || d[last] === null)
d[last] = toUpdate.value;
else if (typeof d[last] !== "number") {
throw new Error("Field is no number!");
} else {
d[last] += toUpdate.value;
}
break;
case "timestamp":
d[last] = new Date().valueOf();
break;
case "push":
if (d[last] === undefined || d[last] === null)
d[last] = [toUpdate.value];
else if (Array.isArray(d[last])) {
d[last].push(toUpdate.value);
} else {
throw new Error("Field is not array!");
}
break;
default:
throw new Error("Invalid update type: " + toUpdate.type);
}
}
this.database.data
.put(this.getKey(collection, document), encode(data))
.then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data))
} finally {
lock();
}
//TODO: Implement
}
public async delete() {
let { collection, document } = await this.resolve(this.path);
if (!collection) {
throw new Error("There must be a collection!")
}
if (!document) {
throw new Error("There must be a document key!")
}
const lock = await this.database.locks.lock(collection, document);
return await this.database.data
.del(`${collection}/${document}`)
.then(() => this.sendChange(collection, document, "deleted", null))
.finally(() => lock())
}
private subscription: {
key: string,
onChange: (change: DocRes & { type: ChangeTypes }) => void
};
async snapshot(onChange: (change: DocRes & { type: ChangeTypes }) => void) {
if (this.subscription)
throw new Error("This query is already subscribed!");
let { collection, document } = await this.resolve(this.path);
let data = await this.getDoc(collection, document);
let key = this.getKey(collection, document);
this.subscription = {
key,
onChange
}
let s = this.database.changes.get(key);
if (!s) {
s = new Set();
this.database.changes.set(key, s);
}
s.add(this.onChange);
return data;
}
onChange(change: Change) {
// if(change.sender === this.sender)
// return
this.subscription.onChange({
id: change.document,
data: change.data,
type: change.type
})
}
unsubscribe() {
if (!this.subscription)
return;
let s = this.database.changes.get(this.subscription.key);
s.delete(this.onChange);
if (s.size <= 0)
this.database.changes.delete(this.subscription.key);
this.subscription = undefined;
}
public static fromQuery(query: Query) {
return new DocumentQuery(...Query.getConstructorParams(query));
}
}
type FieldPath = string;
type WhereFilterOp =
| '<'
| '<='
| '=='
| '>='
| '>'
| 'array-contains'
| 'in'
| 'array-contains-any';
interface IQueryWhere {
fieldPath: FieldPath,
opStr: WhereFilterOp,
value: any
}
interface DocRes {
id: string;
data: any;
}
export class CollectionQuery extends Query {
constructor(database: Database, path: string[], sender: string) {
super(database, path, sender);
this.onChange = this.onChange.bind(this);
}
public where: IQueryWhere[] = [];
public limit: number = -1;
public async add(value: any) {
let id = nanoid(ALPHABET, 32);
let q = new DocumentQuery(this.database, [...this.path, id], this.sender);
await q.set(value, {});
return id;
}
private getStreamOptions(collection: string) {
let gt = Buffer.from(this.getKey(collection) + " ");
gt[gt.length - 1] = 0;
let lt = Buffer.alloc(gt.length);
lt.set(gt);
lt[gt.length - 1] = 0xFF;
return {
gt,
lt
}
}
public async keys() {
let { collection, document } = await this.resolve(this.path);
if (document)
throw new Error("Keys only works on collections!");
if (!collection)
throw new Error("There must be a collection");
return new Promise<string[]>((yes, no) => {
let keys = [];
const stream = this.database.data.createKeyStream({
...this.getStreamOptions(collection),
keyAsBuffer: false
})
stream.on("data", (key: string) => {
let s = key.split("/", 2);
if (s.length > 1)
keys.push(s[1]);
});
stream.on("end", () => yes(keys));
stream.on("error", no);
});
}
private getFieldValue(data: any, path: FieldPath) {
let parts = path.split(".");
let d = data;
while (parts.length > 0) {
let seg = parts.shift();
d = data[seg];
if (d === undefined || d === null)
break; // Undefined/Null has no other fields!
}
return d;
}
private fitsWhere(data: any): boolean {
if (this.where.length > 0) {
return this.where.every(where => {
let val = this.getFieldValue(data, where.fieldPath);
switch (where.opStr) {
case "<":
return val < where.value;
case "<=":
return val <= where.value;
case "==":
return val == where.value;
case ">=":
return val >= where.value;
case ">":
return val > where.value;
case "array-contains":
if (Array.isArray(val)) {
return val.some(e => e === where.value);
}
break;
// case "array-contains-any":
// case "in":
default:
throw new Error("Invalid where operation " + where.opStr);
}
})
}
return true;
}
async get() {
let { collection, document } = await this.resolve(this.path);
if (document)
throw new Error("Keys only works on collections!");
if (!collection)
throw new Error("There must be a collection");
return new Promise<DocRes[]>((yes, no) => {
const stream = this.database.data.iterator({
...this.getStreamOptions(collection),
keyAsBuffer: false,
valueAsBuffer: true
})
let values: DocRes[] = [];
const onValue = (err: Error, key: string, value: Buffer) => {
if (err) {
no(err);
stream.end(err => Logging.error(err));
}
else {
if (!key && !value) {
// END
Logging.debug("Checked all!")
yes(values);
} else {
let s = key.split("/", 2);
if (s.length <= 1)
return;
const id = s[1];
let data = decode(value);
if (this.fitsWhere(data)) {
Logging.debug("Found fitting")
if (this.limit < 0 || value.length < this.limit) {
values.push({
id,
data
});
}
else {
stream.end((err) => err ? no(err) : yes(values))
return;
}
}
stream.next(onValue);
}
}
}
stream.next(onValue)
})
}
private subscription: {
key: string,
onChange: (change: (DocRes & { type: ChangeTypes })[]) => void
};
async snapshot(onChange: (change: (DocRes & { type: ChangeTypes })[]) => void) {
if (this.subscription)
throw new Error("This query is already subscribed!");
let { collection, document } = await this.resolve(this.path);
let data = await this.get();
let key = this.getKey(collection, document);
this.subscription = {
key,
onChange
}
let s = this.database.changes.get(key);
if (!s) {
s = new Set();
this.database.changes.set(key, s);
}
s.add(this.onChange);
return data;
}
onChange(change: Change) {
// if(change.sender === this.sender)
// return
if (this.fitsWhere(change.data)) {
this.subscription.onChange([{
id: change.document,
data: change.data,
type: change.type
}])
}
}
unsubscribe() {
if (!this.subscription)
return;
let s = this.database.changes.get(this.subscription.key);
s.delete(this.onChange);
if (s.size <= 0)
this.database.changes.delete(this.subscription.key);
this.subscription = undefined;
}
public async collections() {
return new Promise<string[]>((yes, no) => {
let keys = [];
const stream = this.database.data.createKeyStream({ keyAsBuffer: false })
stream.on("data", (key: string) => keys.push(key.split("/")));
stream.on("end", () => yes(keys));
stream.on("error", no);
});
}
public async deleteCollection() {
const { collection, document, collectionKey } = await this.resolve(this.path);
if (document) {
throw new Error("There can be no document defined on this operation");
}
//TODO: Lock whole collection!
let batch = this.database.data.batch();
try {
if (collection) {
let documents = await this.keys();
for (let document in documents) {
batch.del(this.getKey(collection, document));
}
await batch.write();
batch = undefined;
await this.database.collections.del(collectionKey);
}
} finally {
if (batch)
batch.clear();
}
}
public static fromQuery(query: Query) {
return new CollectionQuery(...Query.getConstructorParams(query));
}
}