From 1a7400e38c32d8991aef0c23419e9e37b70ab308 Mon Sep 17 00:00:00 2001 From: user Date: Thu, 26 Aug 2021 15:13:36 +0200 Subject: [PATCH] Starting to implement push --- src/com/pushobjectrequest.ts | 148 ++++++++++++++++++++++++++++++++++ src/com/pushobjectresponse.ts | 109 +++++++++++++++++++++++++ src/com/sync_client.ts | 27 +++++++ src/com/sync_server.ts | 10 +++ src/sync.ts | 48 ++++++++--- sync_proto.binc | 11 +++ 6 files changed, 341 insertions(+), 12 deletions(-) create mode 100644 src/com/pushobjectrequest.ts create mode 100644 src/com/pushobjectresponse.ts diff --git a/src/com/pushobjectrequest.ts b/src/com/pushobjectrequest.ts new file mode 100644 index 0000000..f366c11 --- /dev/null +++ b/src/com/pushobjectrequest.ts @@ -0,0 +1,148 @@ +export default interface PushObjectRequest { + id: string; + type: string; + data: Uint8Array; +} +export function serialize_PushObjectRequest (data: PushObjectRequest): Uint8Array { + let fields: ArrayBuffer[] = []; + const addBuffer = (buffer:ArrayBuffer) => fields.push(buffer); + const makeField = (payload_length: number, label: number, type: number) => { + let b = new ArrayBuffer(3 + payload_length); + new DataView(b).setUint16(0, label); + new DataView(b).setUint8(2, type); + fields.push(b); + return new DataView(b, 3); + } + + const makeArrayBuffer = (payload_length: number) => { + let b = new ArrayBuffer(payload_length); + fields.push(b) + return new DataView(b) + } + + const getAutoGrowLength = (length: number | bigint) => { + if (typeof length === "number") length = BigInt(length); + const lt1 = length >= BigInt(Math.pow(2, 15)) ? 0x8000 : 0; + const lt2 = length >= BigInt(Math.pow(2, 30)) ? 0x8000 : 0; + const lt3 = length >= BigInt(Math.pow(2, 45)) ? 0x8000 : 0; + const lt4 = length >= BigInt(Math.pow(2, 61)); + if (lt4) throw new Error("Payload to large!"); + let blen = BigInt(length); + const u16_1 = (Number(blen >> BigInt(0)) & 0x7fff) | lt1; + const u16_2 = (Number(blen >> BigInt(15)) & 0x7fff) | lt2; + const u16_3 = (Number(blen >> BigInt(30)) & 0x7fff) | lt3; + const u16_4 = Number(blen >> BigInt(45)) & 0xffff; + const ld: number[] = [u16_1]; + if (lt1 != 0) ld.push(u16_2); + if (lt2 != 0) ld.push(u16_3); + if (lt3 != 0) ld.push(u16_4); + const dv = new DataView(new ArrayBuffer(ld.length * 2)) + for(let i = 0; i < ld.length; i++) { + dv.setUint16(i * 2, ld[i]) + } + return dv.buffer; + } + + let nrOfFields = 0; + if(data["id"] !== null && data["id"] !== undefined ) { + nrOfFields++; + const str = new TextEncoder().encode(data["id"]); + const lengthBytes = getAutoGrowLength(str.byteLength); + const f = makeField(str.byteLength + lengthBytes.byteLength, 0, 21); + new Uint8Array(f.buffer, f.byteOffset).set(new Uint8Array(lengthBytes), 0); + new Uint8Array(f.buffer, f.byteOffset).set(str, lengthBytes.byteLength); + } + + if(data["type"] !== null && data["type"] !== undefined ) { + nrOfFields++; + const str = new TextEncoder().encode(data["type"]); + const lengthBytes = getAutoGrowLength(str.byteLength); + const f = makeField(str.byteLength + lengthBytes.byteLength, 1, 21); + new Uint8Array(f.buffer, f.byteOffset).set(new Uint8Array(lengthBytes), 0); + new Uint8Array(f.buffer, f.byteOffset).set(str, lengthBytes.byteLength); + } + + if(data["data"] !== null && data["data"] !== undefined ) { + nrOfFields++; + const str = new Uint8Array(data["data"]) + const lengthBytes = getAutoGrowLength(str.byteLength); + const f = makeField(str.byteLength + lengthBytes.byteLength, 2, 20); + new Uint8Array(f.buffer, f.byteOffset).set(new Uint8Array(lengthBytes), 0); + new Uint8Array(f.buffer, f.byteOffset).set(str, lengthBytes.byteLength); + } + + + const fieldsLength = fields.reduce((a, b) => b.byteLength + a, 0); + + + const result = new ArrayBuffer(fieldsLength + 2); + const resultC = new Uint8Array(result); + new DataView(result).setUint16(0, nrOfFields); + + let offset = 2; + for (const buf of fields) { + resultC.set(new Uint8Array(buf), offset); + offset += buf.byteLength; + } + + return new Uint8Array(result); +} + +export function deserialize_PushObjectRequest (data: Uint8Array): PushObjectRequest { + const view = new DataView(data.buffer, data.byteOffset) + let idx = 0; + const readAutoGrowLength = () => { + let result = BigInt(0); + const v1 = view.getUint16(idx); idx += 2; + const u16_1 = v1 & 0x7fff; + result = result | (BigInt(u16_1) << BigInt(0)); + if ((v1 & 0x8000) > 0) { + const v2 = view.getUint16(idx); idx += 2; + const u16_2 = v2 & 0x7fff; + result = result | (BigInt(u16_2) << BigInt(15)); + if ((v2 & 0x8000) > 0) { + const v3 = view.getUint16(idx); idx += 2; + const u16_3 = v3 & 0x7fff; + result = result | (BigInt(u16_3) << BigInt(30)); + if ((v3 & 0x8000) > 0) { + const v4 = view.getUint16(idx); idx += 2; + const u16_4 = v4; + result = result | (BigInt(u16_4) << BigInt(45)); + } + } + } + return Number(result); + } + let result: any = {} + const nrOfFields = view.getUint16(idx); idx += 2; + for(let i = 0; i < nrOfFields; i++) { + const fieldLabel = view.getUint16(idx); idx += 2; + const fieldType = view.getUint8(idx); idx += 1; + switch(fieldLabel) { + case 0: { + const fieldDataLength = readAutoGrowLength() + const fieldDataUint8 = data.slice(idx, idx + fieldDataLength); idx += fieldDataLength; + const str = new TextDecoder().decode(fieldDataUint8); + result["id"] = str; + break; + } + case 1: { + const fieldDataLength = readAutoGrowLength() + const fieldDataUint8 = data.slice(idx, idx + fieldDataLength); idx += fieldDataLength; + const str = new TextDecoder().decode(fieldDataUint8); + result["type"] = str; + break; + } + case 2: { + const fieldDataLength = readAutoGrowLength() + const fieldDataUint8 = data.slice(idx, idx + fieldDataLength); idx += fieldDataLength; + result["data"] = fieldDataUint8; + break; + } + default: + throw new Error("Invalid label found: " + fieldLabel) + break; + } + } + return result; +} \ No newline at end of file diff --git a/src/com/pushobjectresponse.ts b/src/com/pushobjectresponse.ts new file mode 100644 index 0000000..dd1b217 --- /dev/null +++ b/src/com/pushobjectresponse.ts @@ -0,0 +1,109 @@ +export default interface PushObjectResponse { + success: boolean; +} +export function serialize_PushObjectResponse (data: PushObjectResponse): Uint8Array { + let fields: ArrayBuffer[] = []; + const addBuffer = (buffer:ArrayBuffer) => fields.push(buffer); + const makeField = (payload_length: number, label: number, type: number) => { + let b = new ArrayBuffer(3 + payload_length); + new DataView(b).setUint16(0, label); + new DataView(b).setUint8(2, type); + fields.push(b); + return new DataView(b, 3); + } + + const makeArrayBuffer = (payload_length: number) => { + let b = new ArrayBuffer(payload_length); + fields.push(b) + return new DataView(b) + } + + const getAutoGrowLength = (length: number | bigint) => { + if (typeof length === "number") length = BigInt(length); + const lt1 = length >= BigInt(Math.pow(2, 15)) ? 0x8000 : 0; + const lt2 = length >= BigInt(Math.pow(2, 30)) ? 0x8000 : 0; + const lt3 = length >= BigInt(Math.pow(2, 45)) ? 0x8000 : 0; + const lt4 = length >= BigInt(Math.pow(2, 61)); + if (lt4) throw new Error("Payload to large!"); + let blen = BigInt(length); + const u16_1 = (Number(blen >> BigInt(0)) & 0x7fff) | lt1; + const u16_2 = (Number(blen >> BigInt(15)) & 0x7fff) | lt2; + const u16_3 = (Number(blen >> BigInt(30)) & 0x7fff) | lt3; + const u16_4 = Number(blen >> BigInt(45)) & 0xffff; + const ld: number[] = [u16_1]; + if (lt1 != 0) ld.push(u16_2); + if (lt2 != 0) ld.push(u16_3); + if (lt3 != 0) ld.push(u16_4); + const dv = new DataView(new ArrayBuffer(ld.length * 2)) + for(let i = 0; i < ld.length; i++) { + dv.setUint16(i * 2, ld[i]) + } + return dv.buffer; + } + + let nrOfFields = 0; + if(data["success"] !== null && data["success"] !== undefined ) { + nrOfFields++; + const f = makeField(1, 0, 5); + f.setUint8(0, Number(data["success"])); + } + + + const fieldsLength = fields.reduce((a, b) => b.byteLength + a, 0); + + + const result = new ArrayBuffer(fieldsLength + 2); + const resultC = new Uint8Array(result); + new DataView(result).setUint16(0, nrOfFields); + + let offset = 2; + for (const buf of fields) { + resultC.set(new Uint8Array(buf), offset); + offset += buf.byteLength; + } + + return new Uint8Array(result); +} + +export function deserialize_PushObjectResponse (data: Uint8Array): PushObjectResponse { + const view = new DataView(data.buffer, data.byteOffset) + let idx = 0; + const readAutoGrowLength = () => { + let result = BigInt(0); + const v1 = view.getUint16(idx); idx += 2; + const u16_1 = v1 & 0x7fff; + result = result | (BigInt(u16_1) << BigInt(0)); + if ((v1 & 0x8000) > 0) { + const v2 = view.getUint16(idx); idx += 2; + const u16_2 = v2 & 0x7fff; + result = result | (BigInt(u16_2) << BigInt(15)); + if ((v2 & 0x8000) > 0) { + const v3 = view.getUint16(idx); idx += 2; + const u16_3 = v3 & 0x7fff; + result = result | (BigInt(u16_3) << BigInt(30)); + if ((v3 & 0x8000) > 0) { + const v4 = view.getUint16(idx); idx += 2; + const u16_4 = v4; + result = result | (BigInt(u16_4) << BigInt(45)); + } + } + } + return Number(result); + } + let result: any = {} + const nrOfFields = view.getUint16(idx); idx += 2; + for(let i = 0; i < nrOfFields; i++) { + const fieldLabel = view.getUint16(idx); idx += 2; + const fieldType = view.getUint8(idx); idx += 1; + switch(fieldLabel) { + case 0: { + result["success"] = view.getUint8(idx); idx += 1; + break; + } + default: + throw new Error("Invalid label found: " + fieldLabel) + break; + } + } + return result; +} \ No newline at end of file diff --git a/src/com/sync_client.ts b/src/com/sync_client.ts index 0fff3bd..14219cb 100644 --- a/src/com/sync_client.ts +++ b/src/com/sync_client.ts @@ -4,6 +4,8 @@ import PullGetLogRequest, { serialize_PullGetLogRequest, deserialize_PullGetLogR import PullGetLogResponse, { serialize_PullGetLogResponse, deserialize_PullGetLogResponse} from "./pullgetlogresponse"; import PullObjectRequest, { serialize_PullObjectRequest, deserialize_PullObjectRequest} from "./pullobjectrequest"; import PullObjectResponse, { serialize_PullObjectResponse, deserialize_PullObjectResponse} from "./pullobjectresponse"; +import PushObjectRequest, { serialize_PushObjectRequest, deserialize_PushObjectRequest} from "./pushobjectrequest"; +import PushObjectResponse, { serialize_PushObjectResponse, deserialize_PushObjectResponse} from "./pushobjectresponse"; import ServiceMessage, { serialize_ServiceMessage, deserialize_ServiceMessage} from "./servicemessage"; export type { SyncInitRequest, @@ -12,6 +14,8 @@ export type { PullGetLogResponse, PullObjectRequest, PullObjectResponse, + PushObjectRequest, + PushObjectResponse, ServiceMessage, } import { IStream, Stream, Service, ServiceProvider, getRandomID } from "./service_client"; @@ -30,6 +34,10 @@ export class Sync extends Service { istream: false, input: deserialize_PullObjectRequest, rstream: false, ret: serialize_PullObjectResponse }); + this.functions.set("PushObject", { + istream: false, input: deserialize_PushObjectRequest, + rstream: false, ret: serialize_PushObjectResponse + }); } SyncInit(data: SyncInitRequest): Promise { @@ -88,4 +96,23 @@ export class Sync extends Service { }) } + + PushObject(data: PushObjectRequest): Promise { + const msgid = getRandomID(16); + return new Promise((resolve, reject)=>{ + this.calls.set(msgid, (msg)=>{ + this.calls.delete(msgid); + if(msg.error) reject(new Error(msg.error)) + else resolve(deserialize_PushObjectResponse(msg.payload)) + }) + this.sendMessage({ + service: "Sync", + function: "PushObject", + id: msgid, + payload: (serialize_PushObjectRequest(data)) as any, + error: undefined as any + }); + }) + + } } \ No newline at end of file diff --git a/src/com/sync_server.ts b/src/com/sync_server.ts index fa7c1a0..2d9ecdc 100644 --- a/src/com/sync_server.ts +++ b/src/com/sync_server.ts @@ -4,6 +4,8 @@ import PullGetLogRequest, { serialize_PullGetLogRequest, deserialize_PullGetLogR import PullGetLogResponse, { serialize_PullGetLogResponse, deserialize_PullGetLogResponse} from "./pullgetlogresponse"; import PullObjectRequest, { serialize_PullObjectRequest, deserialize_PullObjectRequest} from "./pullobjectrequest"; import PullObjectResponse, { serialize_PullObjectResponse, deserialize_PullObjectResponse} from "./pullobjectresponse"; +import PushObjectRequest, { serialize_PushObjectRequest, deserialize_PushObjectRequest} from "./pushobjectrequest"; +import PushObjectResponse, { serialize_PushObjectResponse, deserialize_PushObjectResponse} from "./pushobjectresponse"; import ServiceMessage, { serialize_ServiceMessage, deserialize_ServiceMessage} from "./servicemessage"; export type { SyncInitRequest, @@ -12,6 +14,8 @@ export type { PullGetLogResponse, PullObjectRequest, PullObjectResponse, + PushObjectRequest, + PushObjectResponse, ServiceMessage, } import { IStream, Service } from "./service_server"; @@ -31,6 +35,10 @@ export abstract class Sync extends Service { istream: false, input: deserialize_PullObjectRequest, rstream: false, ret: serialize_PullObjectResponse }); + this.functions.set("PushObject", { + istream: false, input: deserialize_PushObjectRequest, + rstream: false, ret: serialize_PushObjectResponse + }); } abstract SyncInit(data: SyncInitRequest, ctx: T): Promise; @@ -38,4 +46,6 @@ export abstract class Sync extends Service { abstract PullGetLog(data: PullGetLogRequest, ctx: T): Promise; abstract PullObject(data: PullObjectRequest, ctx: T): Promise; + + abstract PushObject(data: PushObjectRequest, ctx: T): Promise; } \ No newline at end of file diff --git a/src/sync.ts b/src/sync.ts index 4bd0885..a3bc237 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -4,9 +4,9 @@ import PullGetLogRequest from "./com/pullgetlogrequest"; import PullGetLogResponse from "./com/pullgetlogresponse"; import { ServiceProvider as ServiceProviderClient } from "./com/service_client"; import { ServiceProvider as ServiceProviderServer } from "./com/service_server"; -import { PullObjectRequest, PullObjectResponse, Sync as SyncClient, SyncInitRequest, SyncInitResponse } from "./com/sync_client"; +import { PullObjectRequest, PullObjectResponse, PushObjectRequest, PushObjectResponse, Sync as SyncClient, SyncInitRequest, SyncInitResponse } from "./com/sync_client"; import { Sync as SyncServer } from "./com/sync_server"; -import Repository from "./repo"; +import Repository, { ObjectTypes } from "./repo"; if (typeof btoa === 'undefined') { global.btoa = function (str) { @@ -84,13 +84,13 @@ export async function startSyncClient(stream: ISimpleStream, repo: Repository) { let head = await repo.readHead(); let remoteContainsHead = false; - let response = await service.SyncInit({ + let syncResponse = await service.SyncInit({ clientCommit: head?.id as string }) - if (response.needPull) { + if (syncResponse.needPull) { let commitsToDownload = new Set(); - let toAnalyse: string[] = [response.remoteCommit]; + let toAnalyse: string[] = [syncResponse.remoteCommit]; while (toAnalyse.length > 0) { let current = toAnalyse.pop() as string; @@ -168,18 +168,32 @@ export async function startSyncClient(stream: ISimpleStream, repo: Repository) { if (head) { if (remoteContainsHead) { - await repo.writeHead(response.remoteCommit); + await repo.writeHead(syncResponse.remoteCommit); } else { - let commit = await repo.mergeCommits(head.id, response.remoteCommit); + let commit = await repo.mergeCommits(head.id, syncResponse.remoteCommit); await repo.writeHead(commit); } - } else if (response.remoteCommit) { - await repo.writeHead(response.remoteCommit); + } else if (syncResponse.remoteCommit) { + await repo.writeHead(syncResponse.remoteCommit); } } - if (response.needPush) { + if (syncResponse.needPush) { //TODO: Push + // Steps: + // 1. Figure out missing commits + // 2. Push missing Objects of these commits + // 3. Push missing commits + let commitsToPush = new Set(); + let objectsToPush = new Set(); + + let localHead = await repo.readHead() + if (localHead) { // If there is nothing to push, don't push + // Find matching point + let match = undefined; + + if (!syncResponse.remoteCommit) { } + } } @@ -274,7 +288,18 @@ class SyncRemote extends SyncServer { } } - + async PushObject({ id, type, data }: PushObjectRequest): Promise { + if (type in ObjectTypes) { + let newID = await this.repo.writeObject(data, type as any) + if (newID !== id) //TODO: Maybe cleanup wrong object? + throw new Error("Invalid ID!"); + return { + success: true + } + } else { + throw new Error("Invalid type!"); + } + } } export async function startSyncRemote(stream: ISimpleStream, repo: Repository) { @@ -284,6 +309,5 @@ export async function startSyncRemote(stream: ISimpleStream, repo: Repository) { const sess = prov.getSession(stream.send.bind(stream)); stream.on("data", (data) => sess.onMessage(data).catch(no)); stream.on("close", () => { sess.close(); yes() }); - }) } \ No newline at end of file diff --git a/sync_proto.binc b/sync_proto.binc index 55fd812..6b4648d 100644 --- a/sync_proto.binc +++ b/sync_proto.binc @@ -36,8 +36,19 @@ type PullObjectResponse { data : bytes = 1; } +type PushObjectRequest { + id : string = 0; + type: string = 1; + data: bytes = 2; +} + +type PushObjectResponse { + success: boolean = 0; +} + service Sync { SyncInit(SyncInitRequest): SyncInitResponse; PullGetLog(PullGetLogRequest): PullGetLogResponse; PullObject(PullObjectRequest): PullObjectResponse; + PushObject(PushObjectRequest): PushObjectResponse; } \ No newline at end of file