import Logging from "@hibas123/nodelogging"; import { IncomingMessage, Server } from "http"; import * as WebSocket from "ws"; import { DatabaseManager } from "./database/database"; import { CollectionQuery, DocumentQuery, IQuery, ITypedQuery, } from "./database/query"; import Session from "./database/session"; import { verifyJWT } from "./helper/jwt"; import nanoid = require("nanoid"); export class WebsocketConnectionManager { static server: WebSocket.Server; static bind(server: Server) { this.server = new WebSocket.Server({ server }); this.server.on("connection", this.onConnection.bind(this)); } private static async onConnection(socket: WebSocket, req: IncomingMessage) { Logging.log("New Connection:"); const sendError = (error: string) => socket.send(JSON.stringify({ ns: "error_msg", data: error })); const session = new Session(nanoid()); const query = new URL(req.url, "http://localhost").searchParams; const database = query.get("database"); const db = DatabaseManager.getDatabase(database); if (!db) { sendError("Invalid Database!"); socket.close(); return; } const accesskey = query.get("accesskey"); if (db.accesskey) { if (!accesskey || accesskey !== db.accesskey) { sendError("Unauthorized!"); socket.close(); return; } } const authkey = query.get("authkey"); if (authkey && db.publickey) { let res = await verifyJWT(authkey, db.publickey); if (!res || !res.uid) { sendError("Invalid JWT"); socket.close(); return; } else { session.uid = res.uid; } } const rootkey = query.get("rootkey"); if (rootkey && db.rootkey) { if (rootkey === db.rootkey) { session.root = true; Logging.warning(`Somebody logged into ${database} via rootkey`); } } const answer = (id: string, data: any, error: boolean = false) => { if (error) Logging.error(error as any); socket.send( JSON.stringify({ ns: "message", data: { id, error, data } }) ); }; const handler = new Map void>(); handler.set("v2", async ({ id, query }) => db .run(Array.isArray(query) ? query : [query], session) .then((res) => answer(id, res)) .catch((err) => answer(id, undefined, err)) ); // handler.set("bulk", async ({ id, query }) => db.run(query, session) // .then(res => answer(id, res)) // .catch(err => answer(id, undefined, err)) // ); const SnapshotMap = new Map(); handler.set( "snapshot", async ({ id, query, }: { id: string; query: ITypedQuery<"snapshot">; }) => { db.snapshot(query, session, (data) => { Logging.debug("Sending snapshot"); socket.send( JSON.stringify({ ns: "snapshot", data: { id, data }, }) ); }) .then((s) => { answer(id, s.snaphot); SnapshotMap.set(id, s.id); }) .catch((err) => answer(id, undefined, err)); } ); handler.set("unsubscribe", async ({ id }) => { let i = SnapshotMap.get(id); if (i) { db.unsubscribe(i, session); SnapshotMap.delete(i); } }); socket.on("message", async (rawData: string) => { try { let message: { ns: string; data: any } = JSON.parse(rawData); let h = handler.get(message.ns); if (h) { h(message.data); } } catch (err) { Logging.errorMessage("Unknown Error:"); Logging.error(err); } }); socket.on("close", () => { Logging.log(`${session.id} has disconnected!`); session.subscriptions.forEach((unsubscribe) => unsubscribe()); session.subscriptions.clear(); socket.removeAllListeners(); }); } }