This repository has been archived on 2021-06-02. You can view files and clone it, but cannot push or open issues or pull requests.
RealtimeDB-OLD/src/connection.ts

153 lines
4.8 KiB
TypeScript

import * as WebSocket from "ws";
import { Server, IncomingMessage } from "http";
import { DatabaseManager } from "./database/database";
import Logging from "@hibas123/logging";
import Query from "./database/query";
import Session from "./database/session";
import shortid = require("shortid");
import * as JWT from "jsonwebtoken";
async function verifyJWT(token: string, publicKey: string) {
return new Promise<any | undefined>((yes) => {
JWT.verify(token, publicKey, (err, decoded) => {
if (err)
yes(undefined);
else
yes(decoded);
})
})
}
import { URLSearchParams } from "url";
type QueryTypes = "get" | "set" | "push" | "subscribe" | "unsubscribe";
export class ConnectionManager {
static server: WebSocket.Server;
static bind(server: Server) {
this.server = new WebSocket.Server({ server });
this.server.on("connection", this.onConnection.bind(this));
}
private static async onConnection(socket: WebSocket, req: IncomingMessage) {
Logging.log("New Connection:");
const sendError = (msg: string) => socket.send(JSON.stringify({ ns: "error_msg", args: [msg] }));
const session = new Session();
let query = new URLSearchParams(req.url.split("?").pop());
const database = query.get("database");
const db = DatabaseManager.getDatabase(database);
if (!db) {
sendError("Invalid Database!");
socket.close();
return;
}
const accesskey = query.get("accesskey");
if (db.accesskey) {
if (!accesskey || accesskey !== db.accesskey) {
sendError("Unauthorized!");
socket.close();
return;
}
}
const authkey = query.get("authkey");
if (authkey && db.publickey) {
let res = await verifyJWT(authkey, db.publickey);
if (!res || !res.uid) {
sendError("Invalid JWT");
socket.close();
return;
} else {
session.uid = res.uid;
}
}
const rootkey = query.get("rootkey");
if (rootkey && db.rootkey) {
if (rootkey === db.rootkey) {
session.root = true;
Logging.warning(`Somebody logged into ${database} via rootkey`);
}
}
const stored = new Map<string, Query>();
const answer = (id: string, data: any, err: boolean = false) => {
socket.send(JSON.stringify({ ns: "message", args: [id, err, data] }));
}
const handler = new Map<string, ((...args: any[]) => void)>();
handler.set("query", async (id: string, type: QueryTypes, path: string[], data: any) => {
Logging.debug(`Request with id '${id}' from type '${type}' and path '${path}' with data`, data)
try {
const perms = db.rules.hasPermission(path, session);
const noperm = new Error("No permisison!");
if (!db)
answer(id, "Database not found!", true);
else {
const query = stored.get(id) || db.getQuery(path);
switch (type) {
case "get":
if (!perms.read)
throw noperm;
answer(id, await query.get());
return;
case "set":
if (!perms.write)
throw noperm;
answer(id, await query.set(data));
return;
case "push":
if (!perms.write)
throw noperm;
answer(id, await query.push(data));
return;
case "subscribe":
if (!perms.read)
throw noperm;
let subscriptionID = shortid.generate();
query.subscribe(data, (data) => {
socket.send(JSON.stringify({ ns: "event", args: [subscriptionID, data] }));
});
stored.set(id, query);
answer(id, subscriptionID);
return;
case "unsubscribe":
query.unsubscribe();
stored.delete(id);
answer(id, true);
return;
}
answer(id, "Invalid request!", true);
}
} catch (err) {
Logging.error(err);
answer(id, err.message, true);
}
})
socket.on("message", (rawData: string) => {
let data: { ns: string, args: any[] } = JSON.parse(rawData);
let h = handler.get(data.ns);
if (h) {
h(...data.args);
}
})
socket.on("disconnect", () => {
stored.forEach(query => query.unsubscribe());
stored.clear();
socket.removeAllListeners();
})
}
}