257 lines
7.1 KiB
TypeScript
257 lines
7.1 KiB
TypeScript
import * as WebSocket from "ws";
|
|
import { Server, IncomingMessage } from "http";
|
|
import { DatabaseManager } from "./database/database";
|
|
import Logging from "@hibas123/nodelogging";
|
|
import { Query, CollectionQuery, DocumentQuery } from "./database/query";
|
|
import Session from "./database/session";
|
|
import nanoid = require("nanoid");
|
|
|
|
import * as JWT from "jsonwebtoken";
|
|
|
|
async function verifyJWT(token: string, publicKey: string) {
|
|
return new Promise<any | undefined>((yes) => {
|
|
JWT.verify(token, publicKey, (err, decoded) => {
|
|
if (err)
|
|
yes(undefined);
|
|
else
|
|
yes(decoded);
|
|
})
|
|
})
|
|
}
|
|
|
|
const StoreSym = Symbol("store");
|
|
function StoreQuery(result?: any) {
|
|
return {
|
|
[StoreSym]: true,
|
|
result
|
|
}
|
|
}
|
|
|
|
function DeleteQuery(result?: any) {
|
|
return {
|
|
[StoreSym]: false,
|
|
result
|
|
}
|
|
}
|
|
|
|
import { URLSearchParams } from "url";
|
|
|
|
type QueryTypes = "keys" | "get" | "set" | "update" | "delete" | "push" | "subscribe" | "unsubscribe";
|
|
|
|
export class ConnectionManager {
|
|
static server: WebSocket.Server;
|
|
|
|
static bind(server: Server) {
|
|
this.server = new WebSocket.Server({ server });
|
|
this.server.on("connection", this.onConnection.bind(this));
|
|
}
|
|
|
|
private static async onConnection(socket: WebSocket, req: IncomingMessage) {
|
|
Logging.log("New Connection:");
|
|
const sendError = (error: string) => socket.send(JSON.stringify({ ns: "error_msg", data: error }));
|
|
|
|
|
|
const session = new Session(nanoid());
|
|
|
|
let query = new URLSearchParams(req.url.split("?").pop());
|
|
|
|
const database = query.get("database");
|
|
const db = DatabaseManager.getDatabase(database);
|
|
if (!db) {
|
|
sendError("Invalid Database!");
|
|
socket.close();
|
|
return;
|
|
}
|
|
|
|
const accesskey = query.get("accesskey");
|
|
if (db.accesskey) {
|
|
if (!accesskey || accesskey !== db.accesskey) {
|
|
sendError("Unauthorized!");
|
|
socket.close();
|
|
return;
|
|
}
|
|
}
|
|
|
|
const authkey = query.get("authkey");
|
|
if (authkey && db.publickey) {
|
|
let res = await verifyJWT(authkey, db.publickey);
|
|
if (!res || !res.uid) {
|
|
sendError("Invalid JWT");
|
|
socket.close();
|
|
return;
|
|
} else {
|
|
session.uid = res.uid;
|
|
}
|
|
}
|
|
|
|
const rootkey = query.get("rootkey");
|
|
if (rootkey && db.rootkey) {
|
|
if (rootkey === db.rootkey) {
|
|
session.root = true;
|
|
Logging.warning(`Somebody logged into ${database} via rootkey`);
|
|
}
|
|
}
|
|
|
|
const stored = new Map<string, Query>();
|
|
const answer = (id: string, data: any, error: boolean = false) => {
|
|
socket.send(JSON.stringify({ ns: "message", data: { id, error, data } }));
|
|
}
|
|
|
|
const handler = new Map<string, ((data: any) => void)>();
|
|
type QueryData = { id: string, type: QueryTypes, path: string[], data: any, options: any };
|
|
|
|
handler.set("query", async ({ id, type, path, data }: QueryData) => {
|
|
//TODO: Handle case with no id, type, path
|
|
Logging.debug(`Request with id '${id}' from type '${type}' and path '${path.join("/")}' with data`, data)
|
|
|
|
try {
|
|
if (!db)
|
|
throw new Error("Database not found!");
|
|
else {
|
|
let isDoc = path.length % 2 === 0;
|
|
let handler = isDoc ? documentHandler.get(type) : collectionHandler.get(type);
|
|
|
|
if (!handler && session.root)
|
|
handler = rootHandler.get(type);
|
|
|
|
if (!handler)
|
|
throw new Error("Invalid Request!");
|
|
|
|
let query = db.getQuery(path || [], session, isDoc ? "document" : "collection");
|
|
let res = await handler({
|
|
id,
|
|
data,
|
|
socket,
|
|
query: query as any // We know it is the right one
|
|
})
|
|
|
|
if (res && typeof res === "object" && res[StoreSym] !== undefined) {
|
|
if (res[StoreSym])
|
|
stored.set(id, query);
|
|
else
|
|
stored.delete(id);
|
|
|
|
res = res.result;
|
|
}
|
|
answer(id, res);
|
|
}
|
|
} catch (err) {
|
|
// Logging.error(err);
|
|
Logging.debug("Sending error:", err);
|
|
answer(id, err.message, true);
|
|
}
|
|
})
|
|
|
|
socket.on("message", async (rawData: string) => {
|
|
try {
|
|
let message: { ns: string, data: any } = JSON.parse(rawData);
|
|
let h = handler.get(message.ns);
|
|
if (h) {
|
|
h(message.data);
|
|
}
|
|
} catch (err) {
|
|
Logging.errorMessage("Unknown Error:");
|
|
Logging.error(err);
|
|
}
|
|
})
|
|
|
|
socket.on("close", () => {
|
|
Logging.log(`${session.id} has disconnected!`);
|
|
Logging.debug("Clearing stored:", stored);
|
|
stored.forEach(query => (query as DocumentQuery | CollectionQuery).unsubscribe());
|
|
stored.clear();
|
|
socket.removeAllListeners();
|
|
})
|
|
}
|
|
}
|
|
|
|
type QueryHandler<T extends Query> = (api: {
|
|
id: string;
|
|
query: T;
|
|
// storedQuery(id: string): T | undefined;
|
|
socket: WebSocket;
|
|
data: any;
|
|
}) => any;
|
|
|
|
const NoPermissionError = new Error("No permisison!");
|
|
|
|
const rootHandler = new Map<string, QueryHandler<Query>>();
|
|
rootHandler.set("collections", ({ query }) => {
|
|
return CollectionQuery.fromQuery(query).collections();
|
|
})
|
|
|
|
rootHandler.set("delete-collection", ({ query }) => {
|
|
return CollectionQuery.fromQuery(query).deleteCollection();
|
|
})
|
|
|
|
|
|
const documentHandler = new Map<string, QueryHandler<DocumentQuery>>();
|
|
documentHandler.set("get", ({ query }) => {
|
|
return query.get();
|
|
})
|
|
|
|
documentHandler.set("set", ({ query, data }) => {
|
|
return query.set(data, {});
|
|
})
|
|
|
|
documentHandler.set("update", ({ query, data }) => {
|
|
return query.update(data);
|
|
})
|
|
|
|
documentHandler.set("delete", ({ query }) => {
|
|
return query.delete();
|
|
})
|
|
|
|
documentHandler.set("snapshot", async ({ query, data, id, socket }) => {
|
|
let res = await query.snapshot((data) => {
|
|
socket.send(JSON.stringify({
|
|
ns: "snapshot", data: { id, data }
|
|
}));
|
|
});
|
|
|
|
return StoreQuery(res);
|
|
})
|
|
|
|
documentHandler.set("unsubscribe", async ({ query }) => {
|
|
query.unsubscribe();
|
|
return DeleteQuery(true);
|
|
})
|
|
|
|
|
|
const collectionHandler = new Map<string, QueryHandler<CollectionQuery>>();
|
|
collectionHandler.set("keys", ({ query }) => {
|
|
return query.keys();
|
|
})
|
|
|
|
|
|
collectionHandler.set("add", ({ query, data }) => {
|
|
return query.add(data);
|
|
})
|
|
|
|
collectionHandler.set("get", ({ query, data }) => {
|
|
if (data.where)
|
|
query.where = data.where;
|
|
if (data.limit)
|
|
query.limit = data.limit;
|
|
return query.get();
|
|
})
|
|
|
|
collectionHandler.set("snapshot", async ({ query, id, socket, data }) => {
|
|
if (data.where)
|
|
query.where = data.where;
|
|
if (data.limit)
|
|
query.limit = data.limit;
|
|
|
|
let res = await query.snapshot((data) => {
|
|
socket.send(JSON.stringify({
|
|
ns: "snapshot", data: { id, data }
|
|
}));
|
|
});
|
|
|
|
return StoreQuery(res);
|
|
})
|
|
|
|
collectionHandler.set("unsubscribe", async ({ query }) => {
|
|
query.unsubscribe();
|
|
return DeleteQuery(true);
|
|
}) |