This commit is contained in:
parent
7fe18a0037
commit
d6a565b2cb
@ -1,9 +1,6 @@
|
|||||||
<<<<<<< HEAD
|
|
||||||
root=true
|
root=true
|
||||||
=======
|
|
||||||
>>>>>>> 0bfdbce908484560108e20cede6f9e7c46710818
|
|
||||||
[*]
|
[*]
|
||||||
charset = utf-8
|
charset = utf-8
|
||||||
indent_size = 3
|
indent_size = 3
|
||||||
indent_style = space
|
indent_style = space
|
||||||
insert_final_newline = true
|
insert_final_newline = true
|
||||||
|
@ -1,122 +0,0 @@
|
|||||||
import Logging from "@hibas123/nodelogging";
|
|
||||||
import { IncomingMessage, Server } from "http";
|
|
||||||
import * as WebSocket from "ws";
|
|
||||||
import { DatabaseManager } from "./database/database";
|
|
||||||
import { CollectionQuery, DocumentQuery, IQuery, ITypedQuery } from "./database/query";
|
|
||||||
import Session from "./database/session";
|
|
||||||
import { verifyJWT } from "./helper/jwt";
|
|
||||||
import nanoid = require("nanoid");
|
|
||||||
|
|
||||||
export class ConnectionManager {
|
|
||||||
static server: WebSocket.Server;
|
|
||||||
|
|
||||||
static bind(server: Server) {
|
|
||||||
this.server = new WebSocket.Server({ server });
|
|
||||||
this.server.on("connection", this.onConnection.bind(this));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static async onConnection(socket: WebSocket, req: IncomingMessage) {
|
|
||||||
Logging.log("New Connection:");
|
|
||||||
const sendError = (error: string) => socket.send(JSON.stringify({ ns: "error_msg", data: error }));
|
|
||||||
|
|
||||||
const session = new Session(nanoid());
|
|
||||||
|
|
||||||
const query = new URL(req.url, "http://localhost").searchParams;
|
|
||||||
|
|
||||||
const database = query.get("database");
|
|
||||||
const db = DatabaseManager.getDatabase(database);
|
|
||||||
if (!db) {
|
|
||||||
sendError("Invalid Database!");
|
|
||||||
socket.close();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const accesskey = query.get("accesskey");
|
|
||||||
if (db.accesskey) {
|
|
||||||
if (!accesskey || accesskey !== db.accesskey) {
|
|
||||||
sendError("Unauthorized!");
|
|
||||||
socket.close();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const authkey = query.get("authkey");
|
|
||||||
if (authkey && db.publickey) {
|
|
||||||
let res = await verifyJWT(authkey, db.publickey);
|
|
||||||
if (!res || !res.uid) {
|
|
||||||
sendError("Invalid JWT");
|
|
||||||
socket.close();
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
session.uid = res.uid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const rootkey = query.get("rootkey");
|
|
||||||
if (rootkey && db.rootkey) {
|
|
||||||
if (rootkey === db.rootkey) {
|
|
||||||
session.root = true;
|
|
||||||
Logging.warning(`Somebody logged into ${database} via rootkey`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const answer = (id: string, data: any, error: boolean = false) => {
|
|
||||||
if (error)
|
|
||||||
Logging.error(error as any);
|
|
||||||
socket.send(JSON.stringify({ ns: "message", data: { id, error, data } }));
|
|
||||||
}
|
|
||||||
|
|
||||||
const handler = new Map<string, ((data: any) => void)>();
|
|
||||||
|
|
||||||
handler.set("v2", async ({ id, query }) => db.run(Array.isArray(query) ? query : [query], session)
|
|
||||||
.then(res => answer(id, res))
|
|
||||||
.catch(err => answer(id, undefined, err))
|
|
||||||
);
|
|
||||||
|
|
||||||
// handler.set("bulk", async ({ id, query }) => db.run(query, session)
|
|
||||||
// .then(res => answer(id, res))
|
|
||||||
// .catch(err => answer(id, undefined, err))
|
|
||||||
// );
|
|
||||||
|
|
||||||
|
|
||||||
const SnapshotMap = new Map<string, string>();
|
|
||||||
handler.set("snapshot", async ({ id, query }: { id: string, query: ITypedQuery<"snapshot"> }) => {
|
|
||||||
db.snapshot(query, session, (data => {
|
|
||||||
socket.send(JSON.stringify({
|
|
||||||
ns: "snapshot", data: { id, data }
|
|
||||||
}));
|
|
||||||
})).then(s => {
|
|
||||||
answer(id, s.snaphot);
|
|
||||||
SnapshotMap.set(id, s.id);
|
|
||||||
}).catch(err => answer(id, undefined, err));
|
|
||||||
})
|
|
||||||
|
|
||||||
handler.set("unsubscribe", async ({ id }) => {
|
|
||||||
let i = SnapshotMap.get(id);
|
|
||||||
if (i) {
|
|
||||||
db.unsubscribe(i, session);
|
|
||||||
SnapshotMap.delete(i);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
socket.on("message", async (rawData: string) => {
|
|
||||||
try {
|
|
||||||
let message: { ns: string, data: any } = JSON.parse(rawData);
|
|
||||||
let h = handler.get(message.ns);
|
|
||||||
if (h) {
|
|
||||||
h(message.data);
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
Logging.errorMessage("Unknown Error:");
|
|
||||||
Logging.error(err);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
socket.on("close", () => {
|
|
||||||
Logging.log(`${session.id} has disconnected!`);
|
|
||||||
session.subscriptions.forEach(unsubscribe => unsubscribe());
|
|
||||||
session.subscriptions.clear();
|
|
||||||
socket.removeAllListeners();
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user