import { AwaitStore, Lock } from "@hibas123/utils"; import * as fs from "fs"; import * as path from "path"; import { Adapter, Message, Formatted, LoggingTypes } from "@hibas123/logging"; import { once } from "events"; const MAX_FILE_SIZE = 500000000; export class FileAdapter implements Adapter { level = LoggingTypes.Debug; file: Files; isInit = new AwaitStore(false); constructor(private filename: string, private maxFileSize = MAX_FILE_SIZE) {} setLevel(level: LoggingTypes) { this.level = level; } async init() { if (!this.file) { this.file = Files.getFile(this.filename); await this.file .init(this.maxFileSize) .then(() => this.isInit.send(true)); } } flush(sync: boolean) { // return this.file.flush(sync); } onMessage(message: Message) { let msg = Buffer.from(Formatted.strip(message.text) + "\n"); this.file.write(msg); } async close() { if (this.file) { await this.file.close(); this.file = undefined; } this.isInit.send(false); } } //TODO: Optimise write path const Debounce = (callback: () => void, iv = 500, max = 100) => { let to: any; let curr = 0; return { trigger: () => { curr++; if (curr >= max) { curr = 0; // not clearing timeout, since this is a very high cost operation callback(); } else if (!to) { to = setTimeout(() => { to = undefined; curr = 0; callback(); }, iv); } }, }; }; const QUEUE_START_SIZE = 10000; export class Files { private static files = new Map(); static getFile(filename: string): Files { filename = path.resolve(filename); let file = this.files.get(filename); if (!file) { file = new Files(filename); this.files.set(filename, file); } file.open++; return file; } private open = 0; #maxFileSize = MAX_FILE_SIZE; #size: number = 0; #stream: fs.WriteStream = undefined; #lock = new Lock(); #debounce = Debounce(this.checkQueue.bind(this)); #initialized = false; #queue: Buffer[] = new Array(QUEUE_START_SIZE); #queueIdx = 0; public get initlialized() { return this.#initialized; } private constructor(private file: string) {} public async init(maxFileSize: number) { if (this.#initialized) return; this.#maxFileSize = maxFileSize; let lock = await this.#lock.getLock(); const folder = path.dirname(this.file); if (folder) { if (!(await fsExists(folder))) { await fsMkDir(folder).catch(() => {}); //Could happen, if two seperate instances want to create the same folder so ignoring } } await this.initializeFile(); this.#initialized = true; await this.checkQueue(true); lock.release(); } private async initializeFile(new_file = false) { try { if (this.#stream) { const closePrms = once(this.#stream, "close"); this.#stream.end(); await closePrms; } let size = 0; if (await fsExists(this.file)) { let stats = await fsStat(this.file); if (new_file || stats.size >= this.#maxFileSize) { if (await fsExists(this.file + ".old")) await fsUnlink(this.file + ".old"); await fsMove(this.file, this.file + ".old"); } else { size = stats.size; } } this.#stream = fs.createWriteStream(this.file, { flags: "a" }); this.#size = size; } catch (err) { console.log(err); //TODO: is this the right behavior? Probably not... process.exit(1); } } private async checkQueue(nolock: boolean = false) { let lock: any; if (nolock == false) { //TODO: New design might cause new messages to be "stalled" till close or another message if (this.#lock.locked) return; lock = await this.#lock.getLock(); } const queue = this.#queue; const queueCnt = this.#queueIdx; this.#queue = new Array(QUEUE_START_SIZE); this.#queueIdx = 0; let buffer = Buffer.alloc(1024 * 128); let ci = 0; for (let i = 0; i < queueCnt; i++) { const entry = queue[i]; if (entry.length + ci > buffer.length) { await this.write_to_file(buffer.slice(0, ci)); ci = 0; if (entry.length > buffer.length) { await this.write_to_file(entry); } else { entry.copy(buffer, ci); ci += entry.length; } } else { entry.copy(buffer, ci); ci += entry.length; } } if (ci > 0) { await this.write_to_file(buffer.slice(0, ci)); } if (lock) lock.release(); } public async close() { //TODO: maybe some raceconditions when open collides with close const lock = await this.#lock.getLock(); await this.checkQueue(true); this.open--; if (this.open <= 0) { const a = once(this.#stream, "close"); this.#stream.close(); await a; Files.files.delete(this.file); } lock.release(); } private async write_to_file(data: Buffer) { try { if ( data.byteLength < this.#maxFileSize && this.#size + data.byteLength > this.#maxFileSize ) { await this.initializeFile(true); } this.#size += data.byteLength; this.#stream.write(data); } catch (err) { // TODO: Better error handling! console.error(err); this.initializeFile(false); this.write_to_file(data); } } public write(data: Buffer) { this.#queue[this.#queueIdx++] = data; this.#debounce.trigger(); } } function fsUnlink(path: string) { if (fs.promises?.unlink) { return fs.promises.unlink(path); } return new Promise((resolve, reject) => { fs.unlink(path, (err) => { if (err) reject(err); else resolve(); }); }); } function fsStat(path: string) { if (fs.promises?.stat) { return fs.promises.stat(path); } return new Promise((resolve, reject) => { fs.stat(path, (err, stats) => { if (err) reject(err); else resolve(stats); }); }); } function fsMove(oldPath: string, newPath: string) { return new Promise((resolve, reject) => { let callback = (err?) => { if (err) reject(err); else resolve(); }; fs.rename(oldPath, newPath, function (err) { if (err) { if (err.code === "EXDEV") { copy(); } else { callback(err); } return; } callback(); }); function copy() { fs.copyFile(oldPath, newPath, (err) => { if (err) callback(err); else fs.unlink(oldPath, callback); }); } }); } function fsExists(path: string) { return new Promise((resolve, reject) => { fs.access(path, (err) => resolve(!err)); }); } function fsMkDir(path: string) { return new Promise((resolve, reject) => { fs.mkdir(path, (err) => (err ? reject(err) : resolve())); }); }