Some optimisations

This commit is contained in:
User user
2021-05-19 13:13:48 +02:00
parent 95ef923844
commit ee3123f400
6 changed files with 209 additions and 111 deletions

View File

@ -1,29 +1,33 @@
import { Lock } from "@hibas123/utils";
import { AwaitStore, Lock } from "@hibas123/utils";
import * as fs from "fs";
import * as path from "path";
import { Adapter, Message, Formatted, LoggingTypes } from "@hibas123/logging";
import { once } from "events";
const MAX_FILE_SIZE = 500000000;
export class LoggingFiles implements Adapter {
export class FileAdapter implements Adapter {
level = LoggingTypes.Debug;
file: Files;
isInit = new AwaitStore(false);
constructor(private filename: string, private maxFileSize = MAX_FILE_SIZE) {}
setLevel(level: LoggingTypes) {
this.level = level;
}
init() {
async init() {
if (!this.file) {
this.file = Files.getFile(this.filename);
this.file.init(this.maxFileSize);
await this.file
.init(this.maxFileSize)
.then(() => this.isInit.send(true));
}
}
flush(sync: boolean) {
this.file.flush(sync);
// return this.file.flush(sync);
}
onMessage(message: Message) {
@ -31,9 +35,12 @@ export class LoggingFiles implements Adapter {
this.file.write(msg);
}
close() {
this.file.close();
this.file = undefined;
async close() {
if (this.file) {
await this.file.close();
this.file = undefined;
}
this.isInit.send(false);
}
}
@ -61,8 +68,6 @@ const Debounce = (callback: () => void, iv = 500, max = 100) => {
};
export class Files {
private open = 0;
private static files = new Map<string, Files>();
static getFile(filename: string): Files {
filename = path.resolve(filename);
@ -75,13 +80,15 @@ export class Files {
return file;
}
private maxFileSize = MAX_FILE_SIZE;
private size: number = 0;
private stream: fs.WriteStream = undefined;
private lock = new Lock();
private debounce = Debounce(this.checkQueue.bind(this));
private open = 0;
#maxFileSize = MAX_FILE_SIZE;
#size: number = 0;
#stream: fs.WriteStream = undefined;
#lock = new Lock();
#debounce = Debounce(this.checkQueue.bind(this));
#initialized = false;
#queue: Buffer[] = [];
public get initlialized() {
return this.#initialized;
@ -91,30 +98,38 @@ export class Files {
public async init(maxFileSize: number) {
if (this.#initialized) return;
let lock = await this.lock.getLock();
this.maxFileSize == maxFileSize;
this.#maxFileSize = maxFileSize;
let lock = await this.#lock.getLock();
const folder = path.dirname(this.file);
if (folder) {
if (!(await fsExists(folder))) {
await fsMkDir(folder).catch(() => {}); //Could happen, if two seperate instances want to create the same folder so ignoring
}
}
await this.initializeFile();
this.#initialized = true;
await this.checkQueue(true);
lock.release();
this.checkQueue();
}
private async initializeFile(new_file = false) {
console.time("init");
try {
if (this.stream) {
this.stream.close();
}
const folder = path.dirname(this.file);
if (folder) {
if (!(await fsExists(folder))) {
await fsMkDir(folder).catch(() => {}); //Could happen, if two seperate instances want to create the same folder so ignoring
}
if (this.#stream) {
const closePrms = once(this.#stream, "close");
this.#stream.end();
await closePrms;
}
let size = 0;
if (await fsExists(this.file)) {
let stats = await fsStat(this.file);
if (new_file || stats.size >= this.maxFileSize) {
if (new_file || stats.size >= this.#maxFileSize) {
if (await fsExists(this.file + ".old"))
await fsUnlink(this.file + ".old");
await fsMove(this.file, this.file + ".old");
@ -123,62 +138,77 @@ export class Files {
}
}
this.stream = fs.createWriteStream(this.file, { flags: "a" });
this.size = size;
this.#stream = fs.createWriteStream(this.file, { flags: "a" });
this.#size = size;
} catch (e) {
console.log(e);
//TODO: is this the right behavior?
//TODO: is this the right behavior? Probably not...
process.exit(1);
}
console.timeEnd("init");
}
private queue: Buffer[] = [];
async checkQueue() {
if (this.lock.locked) return;
let lock = await this.lock.getLock();
let msg: Buffer;
while ((msg = this.queue.shift())) {
await this.write_to_file(msg);
private async checkQueue(nolock: boolean = false) {
let lock: any;
if (nolock == false) {
if (this.#lock.locked) return;
lock = await this.#lock.getLock();
}
lock.release();
let entry: Buffer;
console.log("Check queue with", this.#queue.length);
let c = 0;
let buffer = Buffer.alloc(1024 * 128);
let ci = 0;
while ((entry = this.#queue.shift())) {
if (entry.length > buffer.length) {
this.write_to_file(entry.slice(0, ci));
ci = 0;
this.write_to_file(entry);
} else if (entry.length + ci > buffer.length) {
this.write_to_file(entry.slice(0, ci));
ci = 0;
entry.copy(buffer, ci);
ci += entry.length;
} else {
entry.copy(buffer, ci);
ci += entry.length;
}
}
if (ci > 0) {
await this.write_to_file(buffer.slice(0, ci));
}
console.log("Check queue real", c);
if (lock) lock.release();
}
public async close() {
await this.flush(false);
//TODO: maybe some raceconditions when open collides with close
const lock = await this.#lock.getLock();
await this.checkQueue(true);
this.open--;
if (this.open <= 0) {
this.stream.close();
const a = once(this.#stream, "close");
this.#stream.close();
await a;
Files.files.delete(this.file);
}
}
public flush(sync: boolean) {
if (sync) {
// if sync flush, the process most likely is in failstate, so checkQueue stopped its work.
let msg: Buffer;
while ((msg = this.queue.shift())) {
this.stream.write(msg);
}
} else {
return Promise.resolve().then(async () => {
const lock = await this.lock.getLock();
lock.release();
await this.checkQueue();
});
}
lock.release();
}
private async write_to_file(data: Buffer) {
try {
if (
data.byteLength < this.maxFileSize &&
this.size + data.byteLength > this.maxFileSize
data.byteLength < this.#maxFileSize &&
this.#size + data.byteLength > this.#maxFileSize
) {
await this.initializeFile(true);
}
this.size += data.byteLength;
this.stream.write(data);
this.#size += data.byteLength;
this.#stream.write(data);
} catch (err) {
// TODO: Better error handling!
console.error(err);
@ -188,14 +218,15 @@ export class Files {
}
public write(data: Buffer) {
this.queue.push(data);
this.debounce.trigger();
this.#queue.push(data);
this.#debounce.trigger();
}
public dispose() {}
}
function fsUnlink(path) {
function fsUnlink(path: string) {
if (fs.promises?.unlink) {
return fs.promises.unlink(path);
}
return new Promise<void>((resolve, reject) => {
fs.unlink(path, (err) => {
if (err) reject(err);
@ -205,6 +236,9 @@ function fsUnlink(path) {
}
function fsStat(path: string) {
if (fs.promises?.stat) {
return fs.promises.stat(path);
}
return new Promise<fs.Stats>((resolve, reject) => {
fs.stat(path, (err, stats) => {
if (err) reject(err);
@ -243,7 +277,7 @@ function fsMove(oldPath: string, newPath: string) {
function fsExists(path: string) {
return new Promise<boolean>((resolve, reject) => {
fs.exists(path, resolve);
fs.access(path, (err) => resolve(!err));
});
}