First basic version

This commit is contained in:
Fabian Stamm
2019-11-04 00:45:30 +01:00
parent a78e98a0c8
commit 68011e3866
10 changed files with 431 additions and 393 deletions

View File

@ -1,9 +1,8 @@
import { Rules } from "./rules";
import Settings from "../settings";
import getLevelDB from "../storage";
import PathLock from "./lock";
import getLevelDB, { LevelDB, deleteLevelDB } from "../storage";
import DocumentLock from "./lock";
import Query from "./query";
import { Observable } from "@hibas123/utils";
export class DatabaseManager {
static databases = new Map<string, Database>();
@ -36,27 +35,33 @@ export class DatabaseManager {
if (db) {
await Settings.deleteDatabase(name);
await db.stop();
await deleteLevelDB(db.name)
}
}
}
export enum ChangeTypes {
SET,
PUSH
}
export type ChangeTypes = "added" | "modified" | "deleted";
export type Change = {
data: any;
document: string;
type: ChangeTypes;
path: string[]
sender: string;
}
export class Database {
public level = getLevelDB(this.name);
public rules: Rules;
public locks = new PathLock()
public changeObservable = new Observable<Change>();
export class Database {
private level = getLevelDB(this.name);
get data() {
return this.level;
}
public rules: Rules;
public locks = new DocumentLock()
public changes = new Map<string, Set<(change: Change) => void>>();
toJSON() {
return {
@ -94,11 +99,11 @@ export class Database {
}
getQuery(path: string[]) {
return new Query(this, path);
getQuery(path: string[], sender: string) {
return new Query(this, path, sender);
}
async stop() {
await this.level.close();
await this.data.close();
}
}

View File

@ -1,43 +1,23 @@
export type Release = { release: () => void };
export default class PathLock {
locks: {
path: string[],
next: (() => void)[]
}[] = [];
export default class DocumentLock {
private locks = new Map<string, (() => void)[]>();
constructor() { }
async lock(path: string[]) {
let locks = this.locks.filter(lock => {
let idxs = Math.min(lock.path.length, path.length);
if (idxs === 0) return true;
for (let i = 0; i < idxs; i++) {
if (lock.path[i] !== path[i])
return false;
}
return true;
})
if (locks.length > 0) { // await till release
await Promise.all(locks.map(l => new Promise(res => l.next.push(res))))
} else {
let lock = {
path: path,
next: []
}
this.locks.push(lock);
locks = [lock];
async lock(collection: string = "", document: string = "") {
let key = collection + "/" + document;
let l = this.locks.get(key);
if (l)
await new Promise(resolve => { l.push(resolve); this.locks.set(key, l) });
else {
l = [];
this.locks.set(key, l);
}
return () => {
locks.forEach(lock => {
if (lock.next.length > 0) {
setImmediate(() => lock.next.shift()());
} else {
this.locks.splice(this.locks.indexOf(lock), 1);
}
})
if (l.length > 0)
setImmediate(() => l.shift());
else
this.locks.delete(key)
}
}
}

View File

@ -1,275 +1,265 @@
import { Database, Change, ChangeTypes } from "./database";
import Encoder, { DataTypes } from "@hibas123/binary-encoder";
import { resNull } from "../storage";
import { resNull, LevelDB } from "../storage";
import { LevelUpChain } from "levelup";
import shortid = require("shortid");
import Logging from "@hibas123/nodelogging";
import * as MSGPack from "what-the-pack";
enum FieldTypes {
OBJECT,
VALUE
export const MP = MSGPack.initialize(2 ** 20);
const { encode, decode } = MP;
interface ISubscribeOptions {
existing: boolean;
}
interface IField {
type: FieldTypes;
value?: any
}
export const FieldEncoder = new Encoder<IField>({
type: {
index: 1,
type: DataTypes.UINT8
},
value: {
index: 3,
type: DataTypes.AUTO,
allowNull: true
}
})
export default class Query {
constructor(private database: Database, private path: string[]) {
/**
* Returns true if the path only contains valid characters and false if it doesn't
* @param path Path to be checked
*/
private validatePath(path: string[]) {
return path.every(e => (e.match(/[^a-zA-Z0-9_\-\<\>]/g) || []).length === 0);
}
constructor(private database: Database, private path: string[], private sender: string) {
if (path.length > 10) {
throw new Error("Path is to long. Path is only allowed to be 10 Layers deep!");
}
if (path.find(segment => segment.indexOf("/") >= 0)) {
throw new Error("Path cannot contain '/'!");
if (!this.validatePath(path)) {
throw new Error("Path can only contain a-z A-Z 0-9 '-' '-' '<' and '>' ");
}
this.onChange = this.onChange.bind(this);
}
private pathToKey(path?: string[]) {
return "/" + (path || this.path).join("/");
private async resolve(path: string[], create = false): Promise<{ collection: string, document: string }> {
path = [...path]; // Create modifiable copy
let collectionID: string = undefined;
let documentKey = undefined;
while (path.length > 0) {
let collectionName = path.shift();
let key = `${collectionID || ""}/${documentKey || ""}/${collectionName}`;
let lock = await this.database.locks.lock(collectionID, documentKey);
try {
collectionID = await this.database.data.get(key).then(r => r.toString()).catch(resNull);
if (!collectionID) {
if (create) {
collectionID = shortid.generate();
await this.database.data.put(key, Buffer.from(collectionID));
} else {
return { collection: null, document: null };
}
}
if (path.length > 0)
documentKey = path.shift();
else
documentKey = undefined;
} finally {
lock();
}
}
return {
collection: collectionID,
document: documentKey
};
}
private getField(path: string[]): Promise<IField | null> {
return this.database.level.get(this.pathToKey(path), { asBuffer: true }).then((res: Buffer) => FieldEncoder.decode(res)).catch(resNull);
private getKey(collection: string, document?: string) {
return `${collection || ""}/${document || ""}`;
}
private getFields(path: string[]) {
let p = this.pathToKey(path);
if (!p.endsWith("/"))
p += "/";
let t = Buffer.from(p);
private getDoc(collection: string, document: string) {
return this.database.data.get(this.getKey(collection, document), { asBuffer: true }).then(res => decode(res as Buffer)).catch(resNull);
}
let gt = Buffer.alloc(t.length + 1);
gt.set(t);
gt[t.length] = 0;
private sendChange(collection: string, document: string, type: ChangeTypes, data: any) {
let change: Change = {
type,
document,
data,
sender: this.sender
}
let lt = Buffer.alloc(t.length + 1);
lt.set(t);
lt[t.length] = 0xFF;
let s = this.database.changes.get(this.getKey(collection, document))
if (s)
s.forEach(e => setImmediate(() => e(change)))
s = this.database.changes.get(this.getKey(collection))
if (s)
s.forEach(e => setImmediate(() => e(change)))
}
public async get() {
let { collection, document } = await this.resolve(this.path);
if (!collection || !document) {
return null;
}
const lock = await this.database.locks.lock(collection, document);
return this.getDoc(collection, document).finally(() => lock())
}
public async keys() {
let { collection, document } = await this.resolve(this.path);
if (document)
throw new Error("Keys only works on collections!");
if (!collection)
throw new Error("There must be a collection");
let gt = Buffer.from(this.getKey(collection) + " ");
gt[gt.length - 1] = 0;
let lt = Buffer.alloc(gt.length);
lt.set(gt);
lt[gt.length - 1] = 0xFF;
return new Promise<string[]>((yes, no) => {
let keys = [];
const stream = this.database.level.createKeyStream({
gt: Buffer.from(p),
lt: Buffer.from(lt)
const stream = this.database.data.createKeyStream({
gt,
lt
})
stream.on("data", key => keys.push(key.toString()));
stream.on("data", (key: string | Buffer) => {
key = key.toString();
let s = key.split("/");
if (s.length === 2)
keys.push(s[1]);
});
stream.on("end", () => yes(keys));
stream.on("error", no);
});
}
async keys() {
const lock = await this.database.locks.lock(this.path);
try {
let obj = await this.getField(this.path);
if (!obj)
return [];
let fields = await this.getFields(this.path);
return fields.map(field => field.split("/").filter(e => e !== "")).filter(path => path.length === this.path.length + 1).map(path => path.pop());
} finally {
lock()
public async set(data: any, { merge = false }) {
if (data === null)
return this.delete();
let { collection, document } = await this.resolve(this.path, true);
if (!collection) {
throw new Error("There must be a collection!")
}
if (!document) {
throw new Error("There must be a document key!")
}
const lock = await this.database.locks.lock(collection, document);
let isNew = await this.database.data.get(this.getKey(collection, document)).catch(resNull).then(e => e !== null);
return this.database.data
.put(this.getKey(collection, document), encode(data))
.then(() => this.sendChange(collection, document, isNew ? "added" : "modified", data))
.finally(() => lock())
}
async get() {
const lock = await this.database.locks.lock(this.path);
try {
const getData = async (path: string[]) => {
let obj = await this.getField(path);
if (!obj)
return null;
else {
if (obj.type === FieldTypes.VALUE) {
return obj.value;
} else {
let fields = await this.getFields(this.path);
let sorted = fields.map(field => field.split("/").filter(e => e !== "")).sort((a, b) => a.length - b.length)
let res = {};
for (let path of sorted) {
let field = await this.getField(path);
let shortened = path.slice(this.path.length);
let t = res;
for (let section of shortened.slice(0, -1)) {
t = t[section];
}
if (field.type === FieldTypes.OBJECT) {
t[path[path.length - 1]] = {};
} else {
t[path[path.length - 1]] = field.value;
}
}
return res;
}
}
}
return await getData(this.path);
} finally {
lock();
}
}
async push(value: any) {
public async push(value: any) {
let id = shortid.generate();
let q = new Query(this.database, [...this.path, id]);
await q.set(value);
this.database.changeObservable.send({
path: [...this.path, id],
type: ChangeTypes.PUSH
})
let q = new Query(this.database, [...this.path, id], this.sender);
await q.set(value, {});
return id;
}
async set(value: any) {
const lock = await this.database.locks.lock(this.path);
let batch = this.database.level.batch();
try {
if (value === null || value === undefined) {
this.delete(value);
} else {
let field = await this.getField(this.path);
if (field) {
await this.delete(batch);
} else {
for (let i = 0; i < this.path.length; i++) {
let subpath = this.path.slice(0, i);
let field = await this.getField(subpath);
if (!field) {
batch.put(this.pathToKey(subpath), FieldEncoder.encode({
type: FieldTypes.OBJECT
}));
} else if (field.type !== FieldTypes.OBJECT) {
throw new Error("Parent elements not all Object. Cannot set value!");
}
}
}
public async delete() {
let { collection, document } = await this.resolve(this.path);
const saveValue = (path: string[], value: any) => {
if (typeof value === "object") {
//TODO: Handle case array!
// Field type array?
batch.put(this.pathToKey(path), FieldEncoder.encode({
type: FieldTypes.OBJECT
}))
for (let field in value) {
saveValue([...path, field], value[field]);
}
} else {
batch.put(this.pathToKey(path), FieldEncoder.encode({
type: FieldTypes.VALUE,
value
}));
}
}
saveValue(this.path, value);
}
await batch.write();
this.database.changeObservable.send({
path: this.path,
type: ChangeTypes.SET
})
} catch (err) {
if (batch.length > 0)
batch.clear();
throw err;
} finally {
lock();
if (!collection) {
throw new Error("There must be a collection!")
}
if (!document) {
throw new Error("There must be a document key!")
}
const lock = await this.database.locks.lock(collection, document);
return await this.database.data
.del(`${collection}/${document}`)
.then(() => this.sendChange(collection, document, "deleted", null))
.finally(() => lock())
}
private async delete(batch?: LevelUpChain) {
let lock = batch ? undefined : await this.database.locks.lock(this.path);
const commit = batch ? false : true;
if (!batch)
batch = this.database.level.batch();
try {
let field = await this.getField(this.path);
if (field) {
let fields = await this.getFields(this.path)
fields.forEach(field => batch.del(field));
batch.del(this.pathToKey(this.path));
}
if (commit)
await batch.write();
} catch (err) {
if (batch.length > 0)
batch.clear()
} finally {
if (lock)
lock()
}
public async update(data: any) {
//TODO: Implement
}
subscription: {
type: ChangeTypes;
send: (data: any) => void;
key: string;
type: Set<ChangeTypes>;
send: (data: Omit<Change, "sender">) => void;
};
subscribe(type: "set" | "push", send: (data: any) => void) {
async subscribe(send: (data: Omit<Change, "sender">) => void, type: ChangeTypes[] | undefined, _options: Partial<ISubscribeOptions>) {
let options: ISubscribeOptions = {
existing: true,
..._options
}
let { collection, document } = await this.resolve(this.path);
let key = this.getKey(collection, document);
let s = this.database.changes.get(key) || new Set();
s.add(this.onChange)
this.database.changes.set(key, s);
this.subscription = {
key,
send,
type: type === "set" ? ChangeTypes.SET : ChangeTypes.PUSH
type: new Set(type || [])
};
this.database.changeObservable.subscribe(this.onChange);
Logging.debug("Subscribe");
if (options.existing) {
if (document) {
send({
document: document,
type: "added",
data: await this.get(),
})
} else {
await this.keys().then(async documents => {
for (let document of documents) {
const data = await this.getDoc(collection, document);
if (data)
send({
type: "added",
document,
data
})
}
})
}
}
Logging.debug("Subscribed");
}
async onChange(change: Change) {
Logging.debug("Change:", change);
if (!this.subscription)
return this.database.changeObservable.unsubscribe(this.onChange);
const { type, send } = this.subscription;
if (type === change.type) {
Logging.debug("Path", this.path, change.path);
if (this.path.length <= change.path.length - (type === ChangeTypes.PUSH ? 1 : 0)) {
let valid = true;
for (let i = 0; i < this.path.length; i++) {
if (this.path[i] !== change.path[i]) {
valid = false;
break;
}
}
if (valid) {
Logging.debug("Send Change:", change);
if (type === ChangeTypes.PUSH) {
send({
id: change.path[change.path.length - 1],
path: change.path,
data: await new Query(this.database, change.path).get()
})
} else {
send(await this.get())
}
}
}
// Events from the sender are handled locally
if (change.sender !== this.sender && type.has(change.type)) {
let c = { ...change };
delete c.sender;
send(c)
}
}
unsubscribe() {
this.subscription = undefined;
this.database.changeObservable.unsubscribe(this.onChange);
if (!this.subscription) {
const { key } = this.subscription;
let s = this.database.changes.get(key);
if (s) {
s.delete(this.onChange);
if (s.size <= 0)
this.database.changes.delete(key);
}
this.subscription = undefined;
}
}
}

View File

@ -1,4 +1,8 @@
export default class Session {
constructor(private _sessionid: string) { }
get sessionid() {
return this._sessionid;
}
root: boolean = false;
uid: string = undefined;
}