Add request flow handler
All checks were successful
the build was successful

This commit is contained in:
Fabian Stamm 2018-12-03 07:54:37 +01:00
parent 69094524d1
commit 5bf7af6565
7 changed files with 712 additions and 24 deletions

32
Readme.md Normal file
View File

@ -0,0 +1,32 @@
# NodeName-Core
This repository is part of the NodeName Project, a simple to use and very flexible Nameserver implementation for NodeJS.
## What does this NodeName-Core contain?
NodeName-Core is responsible for several things:
- parsing incoming DNS requests
- serializing outgoing answers
- providing storage backend API definition
## API
``` JavaScript
const Core = new DnsCore();
Core.addStorage(new AnyStoragePlugin(options), timeout?);
// One or multiple storage plugins
// All Storage engines are requested at the same time all their results will be processed
// Optionally an timeout can be defined after wich this storage engine will be ignored
// If no others answer an error is returned in Response
Core.addMonitoring(new MonitoringPlugin(options));
// Async called Monmitoring plugin.
Core.addListener(new ListenetPlugin(options));
// The question plugin can handle on or more Questions and needs to
// make shure, that it creates the desired output. It is responsible
// for handeling the question and optionally add additional questions
// to be resolved.
Core.addQuestionHandler(new QuestionPlugin(options));
```

View File

@ -28,4 +28,4 @@
"binary-parser": "^1.3.2",
"lru-cache": "^5.1.1"
}
}
}

171
src/benchmark.ts Normal file
View File

@ -0,0 +1,171 @@
import { Request, parseInput } from "./request";
import { RDATARecord, RecourceRecord } from "./record";
import { MonitoringPlugin, QuestionPlugin, DnsCore, AnswerHandler, ListenerPlugin, StoragePlugin, Record } from ".";
import assert = require("assert");
import { RecordTypes } from "./types";
import { Question } from "./question";
function fromHex(data: string) {
return Buffer.from(data.replace(/\s/g, ""), "hex");
}
const itr = 100000;
console.log("All tests x" + itr);
let reqData = fromHex("E835 0100 0001 0000 0000 0000 07 6578616D706c65 03636F6D 00 0001 0001");
console.time("input parser")
for (let i = 0; i < itr; i++) {
parseInput(reqData);
}
console.timeEnd("input parser")
console.time("complete")
for (let i = 0; i < itr; i++) {
let request = new Request(reqData, "")
let rr = new RDATARecord()
rr.CLASS = 1
rr.NAME = "example.com"
rr.TTL = 1600
rr.TYPE = 1
rr.RDATA = fromHex("0A 00 00 01")
request.addAnswer(rr)
request.serialize()
}
console.timeEnd("complete")
console.time("complete 10 answers")
for (let i = 0; i < itr; i++) {
let request = new Request(reqData, "")
let rr = new RDATARecord()
rr.CLASS = 1
rr.NAME = "example.com"
rr.TTL = 1600
rr.TYPE = 1
rr.RDATA = fromHex("0A 00 00 01")
for (let i = 0; i < 10; i++) request.addAnswer(rr);
request.serialize()
}
console.timeEnd("complete 10 answers")
class TestMonitoringPlugin implements MonitoringPlugin {
constructor() { }
async init(core) { }
Priority = 0
onRequest(domain, hostname, type) { }
}
class ARR extends RecourceRecord {
TYPE = RecordTypes.A
constructor(domain: string, ip: string) {
super();
this.NAME = domain
this.TTL = 1600
let data = Buffer.alloc(4)
let idx = 0;
ip.split(".").forEach(e => {
data.writeUInt8(Number(e), idx);
idx++;
})
this.dataLock = () => {
return {
length: 4,
serialize: (buffer, offset) => {
data.copy(buffer, offset);
return offset + 4;
}
}
};
}
}
class TestQuestionPlugin implements QuestionPlugin {
private Core: DnsCore
QuestionTypes = [RecordTypes.A]
Priority = 0
async init(core) {
this.Core = core;
}
async handleQuestion(question: Question, request: AnswerHandler, next: () => void) {
assert(this.QuestionTypes.find(e => e === <any>question.QTYPE), "Handler was called with not supported question type")
let parts = question.QNAME.split(".")
let domain = parts.splice(-2, 2).join(".")
let hostname = parts.join(".");
hostname = hostname !== "" ? hostname : "*"
let records = await this.Core.storageManager.getRecords(domain, hostname, question.QTYPE)
records.forEach(e => request.addAnswer(new ARR(question.QNAME, e.value)));
}
}
class TestListenerPlugin implements ListenerPlugin {
async init(core) { }
Priority = 0;
callback: (data: Buffer, sender: string, max_size?: number) => Promise<Buffer>;
registerCallback(callback) {
this.callback = callback;
}
sendRequest(message: Buffer) {
return this.callback(message, "localhost")
}
}
class TestStoragePlugin implements StoragePlugin {
re: Record = { domain: "example.com", hostname: "test", type: RecordTypes.A, value: "10.0.0.1", ttl: 1600 }
constructor(record?: Record, private count: number = 1) {
if (record)
this.re = record;
}
Priority = 0
NoCache = false
RecordTypes = [RecordTypes.A]
isResponsible(domain: string) {
// console.log("Is responsible")
return true
}
async init(core) { }
async getRecords(domain, hostname, type) {
if (domain !== this.re.domain || hostname !== this.re.hostname || type != this.re.type) return undefined;
let r = [];
for (let i = 0; i < this.count; i++) {
r.push(this.re);
}
return r;
}
async getAllRecordsForDomain(domain) {
if (domain !== this.re.domain) return undefined;
return [this.re];
}
}
let core = new DnsCore();
let listener = new TestListenerPlugin();
core.addListener(listener)
core.addMonitoring(new TestMonitoringPlugin());
core.addQuestion(new TestQuestionPlugin())
let storage = new TestStoragePlugin({
domain: "example.com",
hostname: "test",
type: RecordTypes.A,
ttl: 500,
value: "10.0.0.1"
}, 3)
core.addStorage(storage);
core.start().then(async () => {
let reqData = fromHex("E835 0100 0001 0000 0000 0000 04 74657374 07 6578616D706c65 03 636F6D 00 0001 0001");
console.time("core handler");
for (let i = 0; i < itr; i++) {
await listener.sendRequest(reqData)
}
console.timeEnd("core handler");
});

View File

@ -1,7 +1,309 @@
import * as Types from "./types";
import * as Request from "./request"
import { Request } from "./request";
import { RecordTypes } from "./types";
import * as LRU from "lru-cache";
import { Writable } from "stream";
import { Question } from "./question";
import { RecourceRecord } from "./record";
export = {
types: Types,
request: Request
// export class AnswerSteam extends Writable {
// }
export interface Record {
type: RecordTypes
domain: string
hostname: string
value: string
ttl: number
priority?: number
additional?: string
}
export interface Plugin {
/**
* This sets the priority of this Plugin.
*
* The lower the value, the higher priority.
* It is recommended to make this value changeable over the cosntructor options field.
*/
Priority: number
init(core: DnsCore): Promise<void>
}
export interface ListenerPlugin extends Plugin {
/**
* This method is for registering a callback that is called
* when packet is received and ready for parsing.
* The function returns the Answer data. Errors shouldn't be possible
* by this function
*/
registerCallback(callback: (data: Buffer, sender: string, max_size?: number) => Promise<Buffer>): void
registerCallback(callback: (data: Buffer, sender: string, answer: Writable) => Promise<void>)
}
export class ListenerManager {
private _listener: ListenerPlugin[] = []
add(listener: ListenerPlugin) {
this._listener.push(listener)
}
setup(core: DnsCore) {
return Promise.all(this._listener.map(e => e.init(core)))
}
registerCallback(callback: (data: Buffer, sender: string, max_size?: number) => Promise<Buffer>) {
this._listener.forEach(e => e.registerCallback(callback))
}
}
export interface MonitoringPlugin extends Plugin {
onRequest(domain: string, hostname: string, type: RecordTypes, additionalInformations): void
}
export class MonitoringManager {
private _monitoring: MonitoringPlugin[] = []
add(monitoring: MonitoringPlugin) {
this._monitoring.push(monitoring)
}
setup(core: DnsCore) {
return Promise.all(this._monitoring.map(e => e.init(core)))
}
onRequest(domain: string, hostname: string, type: RecordTypes, additionalInformations) {
this._monitoring.forEach(e => e.onRequest(domain, hostname, type, additionalInformations))
}
}
export interface StoragePlugin extends Plugin {
/**
* Here you can define wich record types this storage plugin is capable of.
*/
RecordTypes: RecordTypes[]
/**
* Disables record caching for this Storage
*/
NoCache: boolean;
/**
* Returns if storage plugin is responsible for this domain.
* This check is synchroneus because it should be quick.
*
* @param domain The domain for check
*/
isResponsible(domain: string): boolean
getRecords(domain: string, hostname: string, type: RecordTypes): Promise<Record[] | undefined>
getAllRecordsForDomain(domain: string): Promise<Record[]>
}
type SortedStorage = {
[P in keyof any]: StoragePlugin[] | undefined
}
const cacheoptions: LRU.Options = {
length: () => 1,
max: 500,
maxAge: 10 * 60 * 1000
}
class RecordCache {
private cache = new LRU<string, Record[]>(cacheoptions);
getRecords(domain: string, hostname: string, type: RecordTypes) {
let key = `${domain};;${hostname};;${RecordTypes[type]}`;
let rec = this.cache.get(key)
if (!rec) {
return undefined;
}
return rec;
}
addRecords(domain: string, hostname: string, type: RecordTypes, records: Record[], ttl: number) {
let key = `${domain};;${hostname};;${RecordTypes[type]}`;
this.cache.set(key, records, ttl);
}
}
export class StorageManager {
private _storages: StoragePlugin[] = []
private _sorted: SortedStorage;
private _cache = new RecordCache();
add(storage: StoragePlugin) {
this._storages.push(storage)
}
private presort() {
this._storages = this._storages.sort((e1, e2) => e1.Priority - e2.Priority)
this._sorted = <any>{}
for (let key in RecordTypes) {
let key_n = Number(key)
if (key_n !== Number.NaN) {
//The _storages list is sorted with priority.
this._sorted[key_n] = this._storages.filter(e => e.RecordTypes.find(t => t === key_n) !== undefined)
}
}
}
setup(core: DnsCore) {
this.presort()
return Promise.all(this._storages.map(e => e.init(core)))
}
async getAllRecordsForDomain(domain: string) {
return this._storages.find(e => e.isResponsible(domain)).getAllRecordsForDomain(domain);
}
async getRecords(domain: string, hostname: string, type: RecordTypes) {
// First check if record is cached. Since the dns entries should not chacnge quickly
// using the cach for quicker response times is the way to go.
let records = this._cache.getRecords(domain, hostname, type);
if (records) return records;
let storages = this._sorted[type].filter(s => s.isResponsible(domain));
if (!storages || storages.length <= 0) return undefined
let nocache = false;
for (let storage of storages) {
nocache = storage.NoCache || nocache;
records = await storage.getRecords(domain, hostname, type)
if (records) {
if (storage.NoCache) this._cache.addRecords(domain, hostname, type, records, records.length <= 0 ? 3600 : records[0].ttl)
return records;
}
}
// If any of the checked storage engines that is
// responsible for the domain has the noCache flag,
// caching will be disabled for all.
if (!nocache)
this._cache.addRecords(domain, hostname, type, [], 0);
return [];
}
}
export interface AnswerHandler {
addQuestion: (question: Question) => void;
addAnswer: (record: RecourceRecord) => void;
addAdditional: (record: RecourceRecord) => void;
addAutority: (record: RecourceRecord) => void
}
export interface QuestionPlugin extends Plugin {
/**
* This field defines wich question types this plugin is capable of handling.
*/
QuestionTypes: RecordTypes[];
init(core: DnsCore): Promise<void>
/**
*
* @param question The question this plugin should solve
* @param request This gives the plugin access to relevant
* functions for queing depending questions or adding answers etc.
* @param next This function should be called if the plugin is not able to resolve
* the request. This will trigger other handlers.
*/
handleQuestion(question: Question, request: AnswerHandler, next: () => void): Promise<void>
}
export class QuestionManager {
private _questions: QuestionPlugin[] = [];
private _sorted = <any>{};
add(question: QuestionPlugin) {
this._questions.push(question);
}
private sort() {
this._questions = this._questions.sort((e1, e2) => e1.Priority - e2.Priority);
this._sorted = {};
for (let key in RecordTypes) {
let key_n = Number(key)
if (key_n !== Number.NaN) {
this._sorted[key_n] = this._questions.filter(e => e.QuestionTypes.find(t => t === key_n) !== undefined)
}
}
}
setup(core: DnsCore) {
this.sort();
return Promise.all(this._questions.map(e => e.init(core)))
}
async handleQuestion(question: Question, request: AnswerHandler) {
let handlers: QuestionPlugin[] = this._sorted[question.QTYPE];
if (!handlers || handlers.length <= 0) return;
let index = 0;
while (index < handlers.length) {
let i = index;
await handlers[i].handleQuestion(question, request, () => {
index++;
})
if (i === index) break;
}
}
}
export class DnsCore {
storageManager: StorageManager;
addStorage(plugin: StoragePlugin) {
this.storageManager.add(plugin);
}
monitoringManager: MonitoringManager;
addMonitoring(plugin: MonitoringPlugin) {
this.monitoringManager.add(plugin);
}
listenerManager: ListenerManager;
addListener(plugin: ListenerPlugin) {
this.listenerManager.add(plugin);
}
questionManager: QuestionManager;
addQuestion(plugin: QuestionPlugin) {
this.questionManager.add(plugin);
}
constructor() {
this.listenerManager = new ListenerManager()
this.monitoringManager = new MonitoringManager()
this.storageManager = new StorageManager()
this.questionManager = new QuestionManager()
}
async start() {
await this.storageManager.setup(this);
await this.monitoringManager.setup(this);
await this.listenerManager.setup(this);
await this.questionManager.setup(this);
this.listenerManager.registerCallback(this.onMessage.bind(this));
}
private async onMessage(data: Buffer, sender: string, max_size: number = 0) {
let request = new Request(data, sender, max_size);
let questionqueue: Promise<void>[] = [];
let handleQuestion = question => {
let prom = this.questionManager.handleQuestion(question, {
addAdditional: record => {
request.addAdditionals(record)
},
addAnswer: record => {
request.addAnswer(record);
},
addAutority: record => {
request.addAuthorities(record);
},
addQuestion: question => {
handleQuestion(question)
}
})
questionqueue.push(prom);
}
request.questions.map(q => handleQuestion(q));
await Promise.all(questionqueue);
return request.serialize();
}
}

View File

@ -1,6 +1,6 @@
import { Parser } from "binary-parser"
import { IMessageHeader, IMessageQuestion, MessageRecourceRecord, ErrorCodes } from "./types"
import { Serializer } from "./serializeable";
import { Serializer, Serializeable } from "./serializeable";
import { Question } from "./question";
import { RecourceRecord } from "./record";
import { Header } from "./header";
@ -68,12 +68,23 @@ export class Request {
private _questions: Question[];
get questions() {
return this._questions.map(e => Object.assign({}, e));
return this._questions.map(e => new Question(e));
}
public _answers: RecourceRecord[] = [];
public _authorities: RecourceRecord[] = [];
public _additionals: RecourceRecord[] = [];
private _answers: Serializer[] = [];
addAnswer(rr: RecourceRecord) {
this._answers.push(rr.lock())
}
private _authorities: Serializer[] = [];
addAuthorities(rr: RecourceRecord) {
this._authorities.push(rr.lock())
}
private _additionals: Serializer[] = [];
addAdditionals(rr: RecourceRecord) {
this._additionals.push(rr.lock())
}
constructor(packet: Buffer, public sender: string, private max_size = 0) {
let parsed = parseInput(packet);
@ -103,9 +114,9 @@ export class Request {
let header = this._header.lock();
let questions = this._questions.map(e => e.lock())
let answers = this._answers.map(e => e.lock())
let authority = this._authorities.map(e => e.lock())
let additional = this._additionals.map(e => e.lock())
let answers = this._answers;
let authority = this._authorities;
let additional = this._additionals;
let length = header.length;
questions.forEach(e => length += e.length);

View File

@ -2,7 +2,7 @@ import { assert } from "chai";
import { parseInput, Request, questionParser, headerParser } from "./request"
import { IMessageHeader, RecordTypes } from "./types";
import { RDATARecord } from "./record";
import { RDATARecord, RecourceRecord } from "./record";
import { Question } from "./question";
import { Header } from "./header";
@ -332,8 +332,187 @@ describe("parser", function () {
rr.TTL = 1600
rr.TYPE = 1
rr.RDATA = fromHex("0A 00 00 01")
request._answers.push(rr)
request.addAnswer(rr);
let data = request.serialize()
assert.equal(data.toString("hex"), should.replace(/\s/g, "").toLowerCase(), "Whole packet serialization failed")
})
})
})
import { DnsCore, StoragePlugin, Record, MonitoringPlugin, QuestionPlugin, AnswerHandler, ListenerPlugin } from "./index"
describe("DNS Core", function () {
it("Initialization", () => {
let core = new DnsCore();
assert.exists(core, "Core constructor working")
})
it("Adding fake plugins and initialize", () => {
let core = new DnsCore();
core.addListener({
init: async (c) => { assert.exists(c, "Core not defined on Plugin init") },
Priority: 0,
registerCallback: () => { }
})
core.addMonitoring({
init: async (c) => { assert.exists(c, "Core not defined on Plugin init") },
Priority: 0,
onRequest: () => { }
})
core.addQuestion({
init: async (c) => { assert.exists(c, "Core not defined on Plugin init") },
QuestionTypes: [RecordTypes.A],
Priority: 0,
handleQuestion: async () => { },
})
core.addStorage({
getAllRecordsForDomain: async () => undefined,
getRecords: async () => undefined,
init: async (c) => { assert.exists(c, "Core not defined on Plugin init") },
isResponsible: () => true,
NoCache: false,
Priority: 0,
RecordTypes: [RecordTypes.A]
})
return core.start();
})
it("Test Request flow", async function () {
let core = new DnsCore();
let listener = new TestListenerPlugin();
core.addListener(listener)
let monitoring = new TestMonitoringPlugin({
domain: "example.com",
hostname: "test",
type: "A"
});
core.addMonitoring(monitoring)
let question = new TestQuestionPlugin();
core.addQuestion(question)
let storage = new TestStoragePlugin({
domain: "example.com",
hostname: "test",
type: RecordTypes.A,
ttl: 500,
value: "10.0.0.1"
})
core.addStorage(storage);
await core.start();
let reqData = fromHex("E835 0100 0001 0000 0000 0000 04 74657374 07 6578616D706c65 03 636F6D 00 0001 0001");
let should = "E835 8580 0001000100000000 04 74657374 076578616D706C6503636F6D0000010001 04 74657374 07 6578616D706C65 03 636F6D 00 0001 0001 0000 0640 0004 0A000001"
let resData = await listener.sendRequest(reqData)
assert.equal(resData.toString("hex"), should.replace(/\s/g, "").toLowerCase(), "Whole packet serialization failed")
})
})
class TestStoragePlugin implements StoragePlugin {
re: Record = { domain: "example.com", hostname: "test", type: RecordTypes.A, value: "10.0.0.1", ttl: 1600 }
constructor(record?: Record) {
if (record)
this.re = record;
}
Priority = 0
NoCache = false
RecordTypes = [RecordTypes.A]
isResponsible(domain: string) {
// console.log("Is responsible")
return true
}
async init(core) { assert.exists(core, "TestStoragePlugin init no core object") }
async getRecords(domain, hostname, type) {
if (domain !== this.re.domain || hostname !== this.re.hostname || type != this.re.type) return undefined;
return [this.re];
}
async getAllRecordsForDomain(domain) {
if (domain !== this.re.domain) return undefined;
return [this.re];
}
}
class TestMonitoringPlugin implements MonitoringPlugin {
constructor(private should: { domain: string, hostname: string, type: string }) { }
async init(core) { assert.exists(core, "TestMonitoringPlugin init no core object") }
Priority = 0
onRequest(domain, hostname, type) {
assert.equal(this.should.domain, domain, "Domain value wrong")
assert.equal(this.should.hostname, hostname, "Domain value wrong")
assert.equal(this.should.type, type, "Domain value wrong")
}
}
class ARR extends RecourceRecord {
TYPE = RecordTypes.A
constructor(domain: string, ip: string) {
super();
this.NAME = domain
this.TTL = 1600
let data = Buffer.alloc(4)
let idx = 0;
ip.split(".").forEach(e => {
data.writeUInt8(Number(e), idx);
idx++;
})
this.dataLock = () => {
return {
length: 4,
serialize: (buffer, offset) => {
data.copy(buffer, offset);
return offset + 4;
}
}
};
// this.RDATA = data;
}
}
class TestQuestionPlugin implements QuestionPlugin {
private Core: DnsCore
QuestionTypes = [RecordTypes.A]
Priority = 0
async init(core) {
assert.exists(core, "TestQuestionPlugin init no core object")
this.Core = core;
}
async handleQuestion(question: Question, request: AnswerHandler, next: () => void) {
assert(this.QuestionTypes.find(e => e === <any>question.QTYPE), "Handler was called with not supported questin type")
let parts = question.QNAME.split(".")
let domain = parts.splice(-2, 2).join(".")
let hostname = parts.join(".");
hostname = hostname !== "" ? hostname : "*"
let records = await this.Core.storageManager.getRecords(domain, hostname, question.QTYPE)
records.forEach(e => request.addAnswer(new ARR(question.QNAME, e.value)));
}
}
class TestListenerPlugin implements ListenerPlugin {
async init(core) { assert.exists(core, "TestListenerPlugin init no core object") }
Priority = 0;
callback: (data: Buffer, sender: string, max_size?: number) => Promise<Buffer>;
registerCallback(callback) {
this.callback = callback;
}
sendRequest(message: Buffer) {
return this.callback(message, "localhost")
}
}

View File

@ -305,10 +305,3 @@ export interface MessageRecourceRecord {
*/
RDATA: Buffer;
}
// export interface IMessage {
// header: IMessageHeader;
// questions: IMessageQuestion[];
// _answers: MessageRecourceRecord[];
// _authorities: MessageRecourceRecord[];
// _additionals: MessageRecourceRecord[];
// }