Improvements in collection storage. Breaks compatibility with previous versions!
This commit is contained in:
@ -54,12 +54,17 @@ export class Database {
|
||||
private level = getLevelDB(this.name);
|
||||
|
||||
get data() {
|
||||
return this.level;
|
||||
return this.level.data;
|
||||
}
|
||||
|
||||
get collections() {
|
||||
return this.level.collection;
|
||||
}
|
||||
|
||||
|
||||
public rules: Rules;
|
||||
public locks = new DocumentLock()
|
||||
public collectionLocks = new DocumentLock()
|
||||
|
||||
public changes = new Map<string, Set<(change: Change) => void>>();
|
||||
|
||||
@ -89,8 +94,8 @@ export class Database {
|
||||
}
|
||||
|
||||
async setRootKey(key: string) {
|
||||
await Settings.setDatabaseAccessKey(this.name, key);
|
||||
this.accesskey = key;
|
||||
await Settings.setDatabaseRootKey(this.name, key);
|
||||
this.rootkey = key;
|
||||
}
|
||||
|
||||
async setPublicKey(key: string) {
|
||||
|
||||
@ -3,6 +3,10 @@ export type Release = { release: () => void };
|
||||
export default class DocumentLock {
|
||||
private locks = new Map<string, (() => void)[]>();
|
||||
|
||||
getLocks() {
|
||||
return Array.from(this.locks.keys())
|
||||
}
|
||||
|
||||
async lock(collection: string = "", document: string = "") {
|
||||
let key = collection + "/" + document;
|
||||
let l = this.locks.get(key);
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
import { Database, Change, ChangeTypes } from "./database";
|
||||
import { resNull } from "../storage";
|
||||
import shortid = require("shortid");
|
||||
import nanoid = require("nanoid/generate");
|
||||
import Logging from "@hibas123/nodelogging";
|
||||
import * as MSGPack from "what-the-pack";
|
||||
|
||||
export const MP = MSGPack.initialize(2 ** 20);
|
||||
|
||||
const ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
const { encode, decode } = MP;
|
||||
|
||||
interface ISubscribeOptions {
|
||||
@ -33,39 +35,28 @@ export default class Query {
|
||||
}
|
||||
|
||||
|
||||
private async resolve(path: string[], create = false): Promise<{ collection: string, document: string }> {
|
||||
private async resolve(path: string[], create = false): Promise<{ collection: string, document: string, collectionKey: string }> {
|
||||
path = [...path]; // Create modifiable copy
|
||||
let collectionID: string = undefined;
|
||||
let documentKey = undefined;
|
||||
while (path.length > 0) {
|
||||
let collectionName = path.shift();
|
||||
let key = `${collectionID || ""}/${documentKey || ""}/${collectionName}`;
|
||||
let lock = await this.database.locks.lock(collectionID, documentKey);
|
||||
try {
|
||||
collectionID = await this.database.data.get(key).then(r => r.toString()).catch(resNull);
|
||||
let documentKey = path.length % 2 === 0 ? path.pop() : undefined;
|
||||
let key = path.join("/");
|
||||
|
||||
if (!collectionID) {
|
||||
if (create) {
|
||||
collectionID = shortid.generate();
|
||||
await this.database.data.put(key, Buffer.from(collectionID));
|
||||
} else {
|
||||
return { collection: null, document: null };
|
||||
}
|
||||
}
|
||||
const lock = await this.database.collectionLocks.lock(key);
|
||||
|
||||
|
||||
if (path.length > 0)
|
||||
documentKey = path.shift();
|
||||
else
|
||||
documentKey = undefined;
|
||||
} finally {
|
||||
lock();
|
||||
try {
|
||||
collectionID = await this.database.collections.get(key).then(r => r.toString()).catch(resNull);
|
||||
if (!collectionID && create) {
|
||||
collectionID = nanoid(ALPHABET, 32);
|
||||
await this.database.collections.put(key, collectionID);
|
||||
}
|
||||
} finally {
|
||||
lock();
|
||||
}
|
||||
|
||||
return {
|
||||
collection: collectionID,
|
||||
document: documentKey
|
||||
document: documentKey,
|
||||
collectionKey: key
|
||||
};
|
||||
}
|
||||
|
||||
@ -123,12 +114,12 @@ export default class Query {
|
||||
let keys = [];
|
||||
const stream = this.database.data.createKeyStream({
|
||||
gt,
|
||||
lt
|
||||
lt,
|
||||
keyAsBuffer: false
|
||||
})
|
||||
stream.on("data", (key: string | Buffer) => {
|
||||
key = key.toString();
|
||||
let s = key.split("/");
|
||||
if (s.length === 2)
|
||||
stream.on("data", (key: string) => {
|
||||
let s = key.split("/", 2);
|
||||
if (s.length > 1)
|
||||
keys.push(s[1]);
|
||||
});
|
||||
stream.on("end", () => yes(keys));
|
||||
@ -159,7 +150,7 @@ export default class Query {
|
||||
}
|
||||
|
||||
public async push(value: any) {
|
||||
let id = shortid.generate();
|
||||
let id = nanoid(ALPHABET, 32);
|
||||
let q = new Query(this.database, [...this.path, id], this.sender);
|
||||
await q.set(value, {});
|
||||
return id;
|
||||
@ -189,7 +180,43 @@ export default class Query {
|
||||
//TODO: Implement
|
||||
}
|
||||
|
||||
subscription: {
|
||||
public async getCollections() {
|
||||
return new Promise<string[]>((yes, no) => {
|
||||
let keys = [];
|
||||
const stream = this.database.data.createKeyStream({ keyAsBuffer: false })
|
||||
stream.on("data", (key: string) => keys.push(key.split("/")));
|
||||
stream.on("end", () => yes(keys));
|
||||
stream.on("error", no);
|
||||
});
|
||||
}
|
||||
|
||||
public async deleteCollection() {
|
||||
const { collection, document, collectionKey } = await this.resolve(this.path);
|
||||
|
||||
if (document) {
|
||||
throw new Error("There can be no document defined on this operation");
|
||||
}
|
||||
|
||||
let batch = this.database.data.batch();
|
||||
try {
|
||||
if (collection) {
|
||||
let keys = await this.keys();
|
||||
for (let key in keys) {
|
||||
batch.del(key);
|
||||
}
|
||||
await batch.write();
|
||||
batch = undefined;
|
||||
await this.database.collections.del(collectionKey);
|
||||
}
|
||||
} finally {
|
||||
if (batch)
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private subscription: {
|
||||
key: string;
|
||||
type: Set<ChangeTypes>;
|
||||
send: (data: Omit<Change, "sender">) => void;
|
||||
@ -213,6 +240,8 @@ export default class Query {
|
||||
type: new Set(type || [])
|
||||
};
|
||||
|
||||
Logging.debug("Existing?", options.existing)
|
||||
|
||||
if (options.existing) {
|
||||
if (document) {
|
||||
send({
|
||||
|
||||
Reference in New Issue
Block a user