Fixing several bugs and adding very basic rule support

This commit is contained in:
Fabian
2019-09-19 16:24:35 +02:00
parent 429ba7e291
commit a7f7edcd0b
8 changed files with 163 additions and 73 deletions

View File

@ -1,8 +1,7 @@
import { Database } from "./database";
import { Database, Change, ChangeTypes } from "./database";
import Encoder, { DataTypes } from "@hibas123/binary-encoder";
import { resNull } from "../storage";
import { Bytes } from "leveldown";
import { LevelUpChain } from "levelup";
import shortid = require("shortid");
import Logging from "@hibas123/nodelogging";
@ -14,20 +13,14 @@ enum FieldTypes {
interface IField {
type: FieldTypes;
// fields?: string[];
value?: any
}
const FieldEncoder = new Encoder<IField>({
export const FieldEncoder = new Encoder<IField>({
type: {
index: 1,
type: DataTypes.UINT8
},
// fields: {
// index: 2,
// type: DataTypes.STRING,
// array: true
// },
value: {
index: 3,
type: DataTypes.AUTO,
@ -43,6 +36,8 @@ export default class Query {
if (path.find(segment => segment.indexOf("/") >= 0)) {
throw new Error("Path cannot contain '/'!");
}
this.onChange = this.onChange.bind(this);
}
private pathToKey(path?: string[]) {
@ -90,15 +85,13 @@ export default class Query {
if (obj.type === FieldTypes.VALUE) {
return obj.value;
} else {
let res = {};
let fields = await this.getFields(this.path);
let a = fields.map(field => field.split("/").filter(e => e !== "")).sort((a, b) => a.length - b.length).map(async path => {
let field = await this.getField(path);
Logging.debug("Path:", path, "Field:", field);
let shortened = path.slice(this.path.length);
let sorted = fields.map(field => field.split("/").filter(e => e !== "")).sort((a, b) => a.length - b.length)
let res = {};
for (let path of sorted) {
let field = await this.getField(path);
let shortened = path.slice(this.path.length);
let t = res;
for (let section of shortened.slice(0, -1)) {
t = t[section];
@ -109,9 +102,7 @@ export default class Query {
} else {
t[path[path.length - 1]] = field.value;
}
})
await Promise.all(a);
}
return res;
}
@ -127,10 +118,17 @@ export default class Query {
let id = shortid.generate();
let q = new Query(this.database, [...this.path, id]);
await q.set(value);
this.database.changeObservable.send({
path: [...this.path, id],
type: ChangeTypes.PUSH
})
return id;
}
async set(value: any) {
if (value === null || value === undefined)
return this.delete(value);
const lock = await this.database.locks.lock(this.path);
let batch = this.database.level.batch();
try {
@ -152,7 +150,6 @@ export default class Query {
}
const saveValue = (path: string[], value: any) => {
Logging.debug("Save Value:", path, value);
if (typeof value === "object") {
//TODO: Handle case array!
// Field type array?
@ -173,6 +170,10 @@ export default class Query {
saveValue(this.path, value);
await batch.write();
this.database.changeObservable.send({
path: this.path,
type: ChangeTypes.SET
})
} catch (err) {
if (batch.length > 0)
batch.clear();
@ -182,7 +183,7 @@ export default class Query {
}
}
async delete(batch?: LevelUpChain) {
private async delete(batch?: LevelUpChain) {
let lock = batch ? undefined : await this.database.locks.lock(this.path);
const commit = batch ? false : true;
if (!batch)
@ -206,6 +207,54 @@ export default class Query {
}
}
async subscribe() { }
async unsubscribe() { }
subscription: {
type: ChangeTypes;
send: (data: any) => void;
};
subscribe(type: "set" | "push", send: (data: any) => void) {
this.subscription = {
send,
type: type === "set" ? ChangeTypes.SET : ChangeTypes.PUSH
};
this.database.changeObservable.subscribe(this.onChange);
Logging.debug("Subscribe");
}
async onChange(change: Change) {
Logging.debug("Change:", change);
if (!this.subscription)
return this.database.changeObservable.unsubscribe(this.onChange);
const { type, send } = this.subscription;
if (type === change.type) {
Logging.debug("Path", this.path, change.path);
if (this.path.length === change.path.length - (type === ChangeTypes.PUSH ? 1 : 0)) {
let valid = true;
for (let i = 0; i < this.path.length; i++) {
if (this.path[i] !== change.path[i]) {
valid = false;
break;
}
}
if (valid) {
Logging.debug("Send Change:", change);
if (type === ChangeTypes.PUSH) {
send({
id: change.path[change.path.length - 1],
data: await new Query(this.database, change.path).get()
})
} else {
send(await this.get())
}
}
}
}
}
unsubscribe() {
this.subscription = undefined;
this.database.changeObservable.unsubscribe(this.onChange);
}
}