From 5bf7af656516997030d911dea4a2f1f1afef0f36 Mon Sep 17 00:00:00 2001 From: Fabian Stamm Date: Mon, 3 Dec 2018 07:54:37 +0100 Subject: [PATCH] Add request flow handler --- Readme.md | 32 +++++ package.json | 2 +- src/benchmark.ts | 171 ++++++++++++++++++++++++++ src/index.ts | 312 ++++++++++++++++++++++++++++++++++++++++++++++- src/request.ts | 27 ++-- src/test.ts | 185 +++++++++++++++++++++++++++- src/types.ts | 7 -- 7 files changed, 712 insertions(+), 24 deletions(-) create mode 100644 Readme.md create mode 100644 src/benchmark.ts diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..b62748c --- /dev/null +++ b/Readme.md @@ -0,0 +1,32 @@ +# NodeName-Core +This repository is part of the NodeName Project, a simple to use and very flexible Nameserver implementation for NodeJS. + +## What does this NodeName-Core contain? + +NodeName-Core is responsible for several things: + +- parsing incoming DNS requests +- serializing outgoing answers +- providing storage backend API definition + +## API + +``` JavaScript + const Core = new DnsCore(); + Core.addStorage(new AnyStoragePlugin(options), timeout?); + // One or multiple storage plugins + // All Storage engines are requested at the same time all their results will be processed + // Optionally an timeout can be defined after wich this storage engine will be ignored + // If no others answer an error is returned in Response + + Core.addMonitoring(new MonitoringPlugin(options)); + // Async called Monmitoring plugin. + + Core.addListener(new ListenetPlugin(options)); + + // The question plugin can handle on or more Questions and needs to + // make shure, that it creates the desired output. It is responsible + // for handeling the question and optionally add additional questions + // to be resolved. + Core.addQuestionHandler(new QuestionPlugin(options)); +``` diff --git a/package.json b/package.json index 8fb6d08..db2f4cb 100644 --- a/package.json +++ b/package.json @@ -28,4 +28,4 @@ "binary-parser": "^1.3.2", "lru-cache": "^5.1.1" } -} \ No newline at end of file +} diff --git a/src/benchmark.ts b/src/benchmark.ts new file mode 100644 index 0000000..5761cf0 --- /dev/null +++ b/src/benchmark.ts @@ -0,0 +1,171 @@ +import { Request, parseInput } from "./request"; +import { RDATARecord, RecourceRecord } from "./record"; +import { MonitoringPlugin, QuestionPlugin, DnsCore, AnswerHandler, ListenerPlugin, StoragePlugin, Record } from "."; +import assert = require("assert"); +import { RecordTypes } from "./types"; +import { Question } from "./question"; + +function fromHex(data: string) { + return Buffer.from(data.replace(/\s/g, ""), "hex"); +} + +const itr = 100000; +console.log("All tests x" + itr); + +let reqData = fromHex("E835 0100 0001 0000 0000 0000 07 6578616D706c65 03636F6D 00 0001 0001"); +console.time("input parser") +for (let i = 0; i < itr; i++) { + parseInput(reqData); +} +console.timeEnd("input parser") + +console.time("complete") +for (let i = 0; i < itr; i++) { + let request = new Request(reqData, "") + let rr = new RDATARecord() + rr.CLASS = 1 + rr.NAME = "example.com" + rr.TTL = 1600 + rr.TYPE = 1 + rr.RDATA = fromHex("0A 00 00 01") + request.addAnswer(rr) + request.serialize() +} +console.timeEnd("complete") + +console.time("complete 10 answers") +for (let i = 0; i < itr; i++) { + let request = new Request(reqData, "") + let rr = new RDATARecord() + rr.CLASS = 1 + rr.NAME = "example.com" + rr.TTL = 1600 + rr.TYPE = 1 + rr.RDATA = fromHex("0A 00 00 01") + for (let i = 0; i < 10; i++) request.addAnswer(rr); + request.serialize() +} +console.timeEnd("complete 10 answers") + +class TestMonitoringPlugin implements MonitoringPlugin { + constructor() { } + async init(core) { } + Priority = 0 + onRequest(domain, hostname, type) { } +} + +class ARR extends RecourceRecord { + TYPE = RecordTypes.A + + constructor(domain: string, ip: string) { + super(); + this.NAME = domain + this.TTL = 1600 + let data = Buffer.alloc(4) + let idx = 0; + ip.split(".").forEach(e => { + data.writeUInt8(Number(e), idx); + idx++; + }) + this.dataLock = () => { + return { + length: 4, + serialize: (buffer, offset) => { + data.copy(buffer, offset); + return offset + 4; + } + } + }; + } +} + +class TestQuestionPlugin implements QuestionPlugin { + private Core: DnsCore + QuestionTypes = [RecordTypes.A] + Priority = 0 + async init(core) { + this.Core = core; + } + + async handleQuestion(question: Question, request: AnswerHandler, next: () => void) { + assert(this.QuestionTypes.find(e => e === question.QTYPE), "Handler was called with not supported question type") + let parts = question.QNAME.split(".") + let domain = parts.splice(-2, 2).join(".") + let hostname = parts.join("."); + hostname = hostname !== "" ? hostname : "*" + let records = await this.Core.storageManager.getRecords(domain, hostname, question.QTYPE) + records.forEach(e => request.addAnswer(new ARR(question.QNAME, e.value))); + } +} + +class TestListenerPlugin implements ListenerPlugin { + async init(core) { } + Priority = 0; + callback: (data: Buffer, sender: string, max_size?: number) => Promise; + registerCallback(callback) { + this.callback = callback; + } + + sendRequest(message: Buffer) { + return this.callback(message, "localhost") + } +} + + +class TestStoragePlugin implements StoragePlugin { + re: Record = { domain: "example.com", hostname: "test", type: RecordTypes.A, value: "10.0.0.1", ttl: 1600 } + + constructor(record?: Record, private count: number = 1) { + if (record) + this.re = record; + } + + Priority = 0 + NoCache = false + RecordTypes = [RecordTypes.A] + isResponsible(domain: string) { + // console.log("Is responsible") + return true + } + + async init(core) { } + + async getRecords(domain, hostname, type) { + if (domain !== this.re.domain || hostname !== this.re.hostname || type != this.re.type) return undefined; + let r = []; + for (let i = 0; i < this.count; i++) { + r.push(this.re); + } + return r; + } + + async getAllRecordsForDomain(domain) { + if (domain !== this.re.domain) return undefined; + return [this.re]; + } +} + +let core = new DnsCore(); + +let listener = new TestListenerPlugin(); +core.addListener(listener) +core.addMonitoring(new TestMonitoringPlugin()); +core.addQuestion(new TestQuestionPlugin()) + +let storage = new TestStoragePlugin({ + domain: "example.com", + hostname: "test", + type: RecordTypes.A, + ttl: 500, + value: "10.0.0.1" +}, 3) +core.addStorage(storage); + +core.start().then(async () => { + let reqData = fromHex("E835 0100 0001 0000 0000 0000 04 74657374 07 6578616D706c65 03 636F6D 00 0001 0001"); + console.time("core handler"); + for (let i = 0; i < itr; i++) { + await listener.sendRequest(reqData) + } + console.timeEnd("core handler"); +}); \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 27b2419..ddd66c2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,309 @@ -import * as Types from "./types"; -import * as Request from "./request" +import { Request } from "./request"; +import { RecordTypes } from "./types"; +import * as LRU from "lru-cache"; +import { Writable } from "stream"; +import { Question } from "./question"; +import { RecourceRecord } from "./record"; -export = { - types: Types, - request: Request +// export class AnswerSteam extends Writable { + +// } + +export interface Record { + type: RecordTypes + domain: string + hostname: string + value: string + ttl: number + priority?: number + additional?: string } + +export interface Plugin { + /** + * This sets the priority of this Plugin. + * + * The lower the value, the higher priority. + * It is recommended to make this value changeable over the cosntructor options field. + */ + Priority: number + + init(core: DnsCore): Promise +} + +export interface ListenerPlugin extends Plugin { + /** + * This method is for registering a callback that is called + * when packet is received and ready for parsing. + * The function returns the Answer data. Errors shouldn't be possible + * by this function + */ + registerCallback(callback: (data: Buffer, sender: string, max_size?: number) => Promise): void + registerCallback(callback: (data: Buffer, sender: string, answer: Writable) => Promise) +} + +export class ListenerManager { + private _listener: ListenerPlugin[] = [] + + add(listener: ListenerPlugin) { + this._listener.push(listener) + } + + setup(core: DnsCore) { + return Promise.all(this._listener.map(e => e.init(core))) + } + + registerCallback(callback: (data: Buffer, sender: string, max_size?: number) => Promise) { + this._listener.forEach(e => e.registerCallback(callback)) + } +} + +export interface MonitoringPlugin extends Plugin { + onRequest(domain: string, hostname: string, type: RecordTypes, additionalInformations): void +} + +export class MonitoringManager { + private _monitoring: MonitoringPlugin[] = [] + + add(monitoring: MonitoringPlugin) { + this._monitoring.push(monitoring) + } + + setup(core: DnsCore) { + return Promise.all(this._monitoring.map(e => e.init(core))) + } + + onRequest(domain: string, hostname: string, type: RecordTypes, additionalInformations) { + this._monitoring.forEach(e => e.onRequest(domain, hostname, type, additionalInformations)) + } +} + +export interface StoragePlugin extends Plugin { + /** + * Here you can define wich record types this storage plugin is capable of. + */ + RecordTypes: RecordTypes[] + + /** + * Disables record caching for this Storage + */ + NoCache: boolean; + + /** + * Returns if storage plugin is responsible for this domain. + * This check is synchroneus because it should be quick. + * + * @param domain The domain for check + */ + isResponsible(domain: string): boolean + getRecords(domain: string, hostname: string, type: RecordTypes): Promise + getAllRecordsForDomain(domain: string): Promise +} + +type SortedStorage = { + [P in keyof any]: StoragePlugin[] | undefined +} + +const cacheoptions: LRU.Options = { + length: () => 1, + max: 500, + maxAge: 10 * 60 * 1000 +} + +class RecordCache { + private cache = new LRU(cacheoptions); + + getRecords(domain: string, hostname: string, type: RecordTypes) { + let key = `${domain};;${hostname};;${RecordTypes[type]}`; + let rec = this.cache.get(key) + if (!rec) { + return undefined; + } + return rec; + } + + addRecords(domain: string, hostname: string, type: RecordTypes, records: Record[], ttl: number) { + let key = `${domain};;${hostname};;${RecordTypes[type]}`; + this.cache.set(key, records, ttl); + } +} + +export class StorageManager { + private _storages: StoragePlugin[] = [] + private _sorted: SortedStorage; + private _cache = new RecordCache(); + + add(storage: StoragePlugin) { + this._storages.push(storage) + } + + private presort() { + this._storages = this._storages.sort((e1, e2) => e1.Priority - e2.Priority) + this._sorted = {} + for (let key in RecordTypes) { + let key_n = Number(key) + if (key_n !== Number.NaN) { + //The _storages list is sorted with priority. + this._sorted[key_n] = this._storages.filter(e => e.RecordTypes.find(t => t === key_n) !== undefined) + } + } + } + + setup(core: DnsCore) { + this.presort() + return Promise.all(this._storages.map(e => e.init(core))) + } + + async getAllRecordsForDomain(domain: string) { + return this._storages.find(e => e.isResponsible(domain)).getAllRecordsForDomain(domain); + } + + async getRecords(domain: string, hostname: string, type: RecordTypes) { + // First check if record is cached. Since the dns entries should not chacnge quickly + // using the cach for quicker response times is the way to go. + let records = this._cache.getRecords(domain, hostname, type); + if (records) return records; + + let storages = this._sorted[type].filter(s => s.isResponsible(domain)); + if (!storages || storages.length <= 0) return undefined + + let nocache = false; + for (let storage of storages) { + nocache = storage.NoCache || nocache; + records = await storage.getRecords(domain, hostname, type) + if (records) { + if (storage.NoCache) this._cache.addRecords(domain, hostname, type, records, records.length <= 0 ? 3600 : records[0].ttl) + return records; + } + } + + // If any of the checked storage engines that is + // responsible for the domain has the noCache flag, + // caching will be disabled for all. + if (!nocache) + this._cache.addRecords(domain, hostname, type, [], 0); + return []; + } +} + +export interface AnswerHandler { + addQuestion: (question: Question) => void; + addAnswer: (record: RecourceRecord) => void; + addAdditional: (record: RecourceRecord) => void; + addAutority: (record: RecourceRecord) => void +} + +export interface QuestionPlugin extends Plugin { + /** + * This field defines wich question types this plugin is capable of handling. + */ + QuestionTypes: RecordTypes[]; + init(core: DnsCore): Promise + + /** + * + * @param question The question this plugin should solve + * @param request This gives the plugin access to relevant + * functions for queing depending questions or adding answers etc. + * @param next This function should be called if the plugin is not able to resolve + * the request. This will trigger other handlers. + */ + handleQuestion(question: Question, request: AnswerHandler, next: () => void): Promise +} + +export class QuestionManager { + private _questions: QuestionPlugin[] = []; + private _sorted = {}; + add(question: QuestionPlugin) { + this._questions.push(question); + } + + private sort() { + this._questions = this._questions.sort((e1, e2) => e1.Priority - e2.Priority); + this._sorted = {}; + for (let key in RecordTypes) { + let key_n = Number(key) + if (key_n !== Number.NaN) { + this._sorted[key_n] = this._questions.filter(e => e.QuestionTypes.find(t => t === key_n) !== undefined) + } + } + } + + setup(core: DnsCore) { + this.sort(); + return Promise.all(this._questions.map(e => e.init(core))) + } + + async handleQuestion(question: Question, request: AnswerHandler) { + let handlers: QuestionPlugin[] = this._sorted[question.QTYPE]; + if (!handlers || handlers.length <= 0) return; + let index = 0; + while (index < handlers.length) { + let i = index; + await handlers[i].handleQuestion(question, request, () => { + index++; + }) + if (i === index) break; + } + } +} + +export class DnsCore { + storageManager: StorageManager; + addStorage(plugin: StoragePlugin) { + this.storageManager.add(plugin); + } + monitoringManager: MonitoringManager; + addMonitoring(plugin: MonitoringPlugin) { + this.monitoringManager.add(plugin); + } + listenerManager: ListenerManager; + addListener(plugin: ListenerPlugin) { + this.listenerManager.add(plugin); + } + + questionManager: QuestionManager; + addQuestion(plugin: QuestionPlugin) { + this.questionManager.add(plugin); + } + + constructor() { + this.listenerManager = new ListenerManager() + this.monitoringManager = new MonitoringManager() + this.storageManager = new StorageManager() + this.questionManager = new QuestionManager() + } + + async start() { + await this.storageManager.setup(this); + await this.monitoringManager.setup(this); + await this.listenerManager.setup(this); + await this.questionManager.setup(this); + this.listenerManager.registerCallback(this.onMessage.bind(this)); + } + + private async onMessage(data: Buffer, sender: string, max_size: number = 0) { + let request = new Request(data, sender, max_size); + let questionqueue: Promise[] = []; + let handleQuestion = question => { + let prom = this.questionManager.handleQuestion(question, { + addAdditional: record => { + request.addAdditionals(record) + }, + addAnswer: record => { + request.addAnswer(record); + }, + addAutority: record => { + request.addAuthorities(record); + }, + addQuestion: question => { + handleQuestion(question) + } + }) + questionqueue.push(prom); + } + request.questions.map(q => handleQuestion(q)); + await Promise.all(questionqueue); + return request.serialize(); + } +} \ No newline at end of file diff --git a/src/request.ts b/src/request.ts index abbd98b..833309d 100644 --- a/src/request.ts +++ b/src/request.ts @@ -1,6 +1,6 @@ import { Parser } from "binary-parser" import { IMessageHeader, IMessageQuestion, MessageRecourceRecord, ErrorCodes } from "./types" -import { Serializer } from "./serializeable"; +import { Serializer, Serializeable } from "./serializeable"; import { Question } from "./question"; import { RecourceRecord } from "./record"; import { Header } from "./header"; @@ -68,12 +68,23 @@ export class Request { private _questions: Question[]; get questions() { - return this._questions.map(e => Object.assign({}, e)); + return this._questions.map(e => new Question(e)); } - public _answers: RecourceRecord[] = []; - public _authorities: RecourceRecord[] = []; - public _additionals: RecourceRecord[] = []; + private _answers: Serializer[] = []; + addAnswer(rr: RecourceRecord) { + this._answers.push(rr.lock()) + } + + private _authorities: Serializer[] = []; + addAuthorities(rr: RecourceRecord) { + this._authorities.push(rr.lock()) + } + + private _additionals: Serializer[] = []; + addAdditionals(rr: RecourceRecord) { + this._additionals.push(rr.lock()) + } constructor(packet: Buffer, public sender: string, private max_size = 0) { let parsed = parseInput(packet); @@ -103,9 +114,9 @@ export class Request { let header = this._header.lock(); let questions = this._questions.map(e => e.lock()) - let answers = this._answers.map(e => e.lock()) - let authority = this._authorities.map(e => e.lock()) - let additional = this._additionals.map(e => e.lock()) + let answers = this._answers; + let authority = this._authorities; + let additional = this._additionals; let length = header.length; questions.forEach(e => length += e.length); diff --git a/src/test.ts b/src/test.ts index 5176786..746bf12 100644 --- a/src/test.ts +++ b/src/test.ts @@ -2,7 +2,7 @@ import { assert } from "chai"; import { parseInput, Request, questionParser, headerParser } from "./request" import { IMessageHeader, RecordTypes } from "./types"; -import { RDATARecord } from "./record"; +import { RDATARecord, RecourceRecord } from "./record"; import { Question } from "./question"; import { Header } from "./header"; @@ -332,8 +332,187 @@ describe("parser", function () { rr.TTL = 1600 rr.TYPE = 1 rr.RDATA = fromHex("0A 00 00 01") - request._answers.push(rr) + request.addAnswer(rr); let data = request.serialize() assert.equal(data.toString("hex"), should.replace(/\s/g, "").toLowerCase(), "Whole packet serialization failed") }) -}) \ No newline at end of file +}) + +import { DnsCore, StoragePlugin, Record, MonitoringPlugin, QuestionPlugin, AnswerHandler, ListenerPlugin } from "./index" + +describe("DNS Core", function () { + it("Initialization", () => { + let core = new DnsCore(); + assert.exists(core, "Core constructor working") + }) + + it("Adding fake plugins and initialize", () => { + let core = new DnsCore(); + + core.addListener({ + init: async (c) => { assert.exists(c, "Core not defined on Plugin init") }, + Priority: 0, + registerCallback: () => { } + }) + + core.addMonitoring({ + init: async (c) => { assert.exists(c, "Core not defined on Plugin init") }, + Priority: 0, + onRequest: () => { } + }) + + core.addQuestion({ + init: async (c) => { assert.exists(c, "Core not defined on Plugin init") }, + QuestionTypes: [RecordTypes.A], + Priority: 0, + handleQuestion: async () => { }, + }) + + core.addStorage({ + getAllRecordsForDomain: async () => undefined, + getRecords: async () => undefined, + init: async (c) => { assert.exists(c, "Core not defined on Plugin init") }, + isResponsible: () => true, + NoCache: false, + Priority: 0, + RecordTypes: [RecordTypes.A] + }) + + return core.start(); + }) + + it("Test Request flow", async function () { + let core = new DnsCore(); + + let listener = new TestListenerPlugin(); + core.addListener(listener) + + let monitoring = new TestMonitoringPlugin({ + domain: "example.com", + hostname: "test", + type: "A" + }); + core.addMonitoring(monitoring) + + + let question = new TestQuestionPlugin(); + core.addQuestion(question) + + let storage = new TestStoragePlugin({ + domain: "example.com", + hostname: "test", + type: RecordTypes.A, + ttl: 500, + value: "10.0.0.1" + }) + core.addStorage(storage); + + await core.start(); + + let reqData = fromHex("E835 0100 0001 0000 0000 0000 04 74657374 07 6578616D706c65 03 636F6D 00 0001 0001"); + let should = "E835 8580 0001000100000000 04 74657374 076578616D706C6503636F6D0000010001 04 74657374 07 6578616D706C65 03 636F6D 00 0001 0001 0000 0640 0004 0A000001" + let resData = await listener.sendRequest(reqData) + + assert.equal(resData.toString("hex"), should.replace(/\s/g, "").toLowerCase(), "Whole packet serialization failed") + }) +}) + +class TestStoragePlugin implements StoragePlugin { + re: Record = { domain: "example.com", hostname: "test", type: RecordTypes.A, value: "10.0.0.1", ttl: 1600 } + + constructor(record?: Record) { + if (record) + this.re = record; + } + + Priority = 0 + NoCache = false + RecordTypes = [RecordTypes.A] + isResponsible(domain: string) { + // console.log("Is responsible") + return true + } + + async init(core) { assert.exists(core, "TestStoragePlugin init no core object") } + + async getRecords(domain, hostname, type) { + if (domain !== this.re.domain || hostname !== this.re.hostname || type != this.re.type) return undefined; + return [this.re]; + } + + async getAllRecordsForDomain(domain) { + if (domain !== this.re.domain) return undefined; + return [this.re]; + } +} + +class TestMonitoringPlugin implements MonitoringPlugin { + constructor(private should: { domain: string, hostname: string, type: string }) { } + + async init(core) { assert.exists(core, "TestMonitoringPlugin init no core object") } + Priority = 0 + onRequest(domain, hostname, type) { + assert.equal(this.should.domain, domain, "Domain value wrong") + assert.equal(this.should.hostname, hostname, "Domain value wrong") + assert.equal(this.should.type, type, "Domain value wrong") + } +} + +class ARR extends RecourceRecord { + TYPE = RecordTypes.A + + constructor(domain: string, ip: string) { + super(); + this.NAME = domain + this.TTL = 1600 + let data = Buffer.alloc(4) + let idx = 0; + ip.split(".").forEach(e => { + data.writeUInt8(Number(e), idx); + idx++; + }) + this.dataLock = () => { + return { + length: 4, + serialize: (buffer, offset) => { + data.copy(buffer, offset); + return offset + 4; + } + } + }; + // this.RDATA = data; + } +} + +class TestQuestionPlugin implements QuestionPlugin { + private Core: DnsCore + QuestionTypes = [RecordTypes.A] + Priority = 0 + async init(core) { + assert.exists(core, "TestQuestionPlugin init no core object") + this.Core = core; + } + + async handleQuestion(question: Question, request: AnswerHandler, next: () => void) { + assert(this.QuestionTypes.find(e => e === question.QTYPE), "Handler was called with not supported questin type") + let parts = question.QNAME.split(".") + let domain = parts.splice(-2, 2).join(".") + let hostname = parts.join("."); + hostname = hostname !== "" ? hostname : "*" + let records = await this.Core.storageManager.getRecords(domain, hostname, question.QTYPE) + records.forEach(e => request.addAnswer(new ARR(question.QNAME, e.value))); + } +} + +class TestListenerPlugin implements ListenerPlugin { + async init(core) { assert.exists(core, "TestListenerPlugin init no core object") } + Priority = 0; + callback: (data: Buffer, sender: string, max_size?: number) => Promise; + registerCallback(callback) { + this.callback = callback; + } + + sendRequest(message: Buffer) { + return this.callback(message, "localhost") + } +} \ No newline at end of file diff --git a/src/types.ts b/src/types.ts index 5074988..f5f3863 100644 --- a/src/types.ts +++ b/src/types.ts @@ -305,10 +305,3 @@ export interface MessageRecourceRecord { */ RDATA: Buffer; } -// export interface IMessage { -// header: IMessageHeader; -// questions: IMessageQuestion[]; -// _answers: MessageRecourceRecord[]; -// _authorities: MessageRecourceRecord[]; -// _additionals: MessageRecourceRecord[]; -// }