import RemoteCommit from "./com/commit"; import PullGetLogRequest from "./com/pullgetlogrequest"; import PullGetLogResponse from "./com/pullgetlogresponse"; import { ServiceProvider as ServiceProviderClient } from "./com/service_client"; import { ServiceProvider as ServiceProviderServer } from "./com/service_server"; import { PullObjectRequest, PullObjectResponse, PushObjectRequest, PushObjectResponse, Sync as SyncClient, SyncInitRequest, SyncInitResponse } from "./com/sync_client"; import { Sync as SyncServer } from "./com/sync_server"; import Repository, { Commit, ObjectTypes, TreeEntry } from "./repo"; if (typeof btoa === 'undefined') { global.btoa = function (str) { return Buffer.from(str, 'binary').toString('base64'); }; } if (typeof atob === 'undefined') { global.atob = function (b64Encoded) { return Buffer.from(b64Encoded, 'base64').toString('binary'); }; } export interface ISimpleStream { on(type: "data", cb: (data: Uint8Array) => void): void; on(type: "close", cb: () => void): void; close(): void; send(data: Uint8Array): void; } export function getCopyStreams(): [ISimpleStream, ISimpleStream] { let cb1d: any = undefined; let cb1c: any = undefined; let cb2d: any = undefined; let cb2c: any = undefined; let s1: ISimpleStream = { on: (type, cb) => { if (type == "data") { cb1d = cb; } else if (type == "close") { cb1c = cb; } }, close: () => { if (cb2c) cb2c(); }, send: data => { if (cb2d) cb2d(data); } } let s2: ISimpleStream = { on: (type, cb) => { if (type == "data") { cb2d = cb; } else if (type == "close") { cb2c = cb; } }, close: () => { if (cb2c) cb1c(); }, send: data => { if (cb1d) cb1d(data); } } return [s1, s2]; } export async function startSyncClient(stream: ISimpleStream, repo: Repository) { const prov = new ServiceProviderClient(stream.send.bind(stream)); stream.on("data", chunk => prov.onPacket(chunk)); stream.on("close", () => { }) //TODO: Implement const service = new SyncClient(prov); let head = await repo.readHead(); let remoteContainsHead = false; let syncResponse = await service.SyncInit({ clientCommit: head?.id as string }) if (syncResponse.needPull) { let commitsToDownload = new Set(); let toAnalyse: string[] = [syncResponse.remoteCommit]; while (toAnalyse.length > 0) { let current = toAnalyse.pop() as string; let resp = await service.PullGetLog({ limit: 20, start: current }); for (const commit of resp.commits) { if (head && commit.id == head.id) { remoteContainsHead = true; } if (!await repo.hasCommit(commit.id)) { commitsToDownload.add(commit.id); if (commit.merge && !await repo.hasCommit(commit.merge)) { commitsToDownload.add(commit.merge); toAnalyse.push(commit.merge); } } } let last = resp.commits[resp.commits.length - 1]; if (last && last.before) { toAnalyse.push(last.before); } } const downloadTree = async (treeID: string) => { let treeData = await service.PullObject({ id: treeID, type: "tree" }); if (!treeData.found) throw new Error(`Tree with id ${treeID} not found!`); let tree = repo.parseTree(treeData.data); for (const [type, objID, name] of tree) { if (!await repo.hasObject(objID)) { if (type == "tree") { await downloadTree(objID); } let res = await service.PullObject({ id: objID, type }); if (!res.found) throw new Error(`Could not find Object with id ${objID} on remote`); repo.writeObject(res.data, type); } } } for (const commitID of commitsToDownload) { let commitData = await service.PullObject({ id: commitID, type: "comm" }); if (!commitData.found) throw new Error(`Commit with id ${commitID} not on server!`); let commit = repo.parseCommit(commitID, commitData.data); if (!await repo.hasObject(commit.root)) { await downloadTree(commit.root); let res = await service.PullObject({ id: commit.root, type: "tree" }); if (!res.found) throw new Error(`Could not find Object with id ${commit.root} on remote`); repo.writeObject(res.data, "tree"); } repo.writeObject(commitData.data, "comm"); } if (head) { if (remoteContainsHead) { await repo.writeHead(syncResponse.remoteCommit); } else { let commit = await repo.mergeCommits(head.id, syncResponse.remoteCommit); await repo.writeHead(commit); } } else if (syncResponse.remoteCommit) { await repo.writeHead(syncResponse.remoteCommit); } } if (syncResponse.needPush) { //TODO: Push // Steps: // 1. Figure out missing commits // 2. Push missing Objects of these commits // 3. Push missing commits let commitsToPush = new Set(); let objectsToPush = new Set(); let localHead = await repo.readHead() if (localHead) { // If there is nothing to push, don't push let toCheck: Commit[] = [localHead]; let commit: Commit; while(toCheck.length > 0) { commit = toCheck.shift() as Commit; if(commit.id != syncResponse.remoteCommit) { //Not yet at remote commit. Wait commitsToPush.add(commit.id); if(commit.before && !commitsToPush.has(commit.before)) // A commit could already be checked by the other part of a merge toCheck.push(await repo.readCommit(commit.before)); if(commit.merge && !commitsToPush.has(commit.merge)) toCheck.push(await repo.readCommit(commit.merge)); } } const traverseTree =async (current: TreeEntry[]) => { for(const [type, hash, name] of current) { objectsToPush.add(hash); if(type =="tree") { let tree = await repo.readTree(hash); await traverseTree(tree); } } } for(const commitId of commitsToPush) { let commit = await repo.readCommit(commitId); let rootId = commit.root objectsToPush.add(rootId); if(objectsToPush.has(rootId)) { objectsToPush.add(rootId); let root = await repo.readTree(rootId); await traverseTree(root); } } } for(const objId of objectsToPush) { let obj = await repo.readObjectTyped(objId) await service.PushObject({ id: objId, data: obj?.data as Uint8Array, type: obj?.type as string }); } } return stream.close(); } class SyncRemote extends SyncServer { repo: Repository; constructor(repo: Repository) { super(); this.repo = repo; } async PullGetLog({ limit, start }: PullGetLogRequest): Promise { let log = await this.repo.readCommitLog(); let startIdx = 0; if (start) { startIdx = log.findIndex(e => e.id == start); if (startIdx < 0) { throw new Error("Invalid start commit ID!"); } } return { commits: log.slice(startIdx, startIdx + limit).map(({ id, root, before, merge, date, }) => ({ id, root, before, merge, date: date.valueOf() })) } } async PullObject({ id, type }: PullObjectRequest): Promise { let res = await this.repo.readObjectTyped(id); if (!res || res.type !== type) { return { found: false, data: undefined as any } } else { return { found: true, data: res.data } } } async SyncInit({ clientCommit }: SyncInitRequest): Promise { let head = await this.repo.readHead(); if (!head) { if (!clientCommit) { return { needPull: false, needPush: false, remoteCommit: undefined as never as string }; } else { return { needPull: false, needPush: true, remoteCommit: undefined as never as string } } } else { if (head.id == clientCommit) { return { needPull: false, needPush: false, remoteCommit: head.id }; } else { if (!clientCommit) { return { needPull: true, needPush: false, remoteCommit: head.id }; } else { let log = await this.repo.readCommitLog(clientCommit); if (log[log.length - 1].id == clientCommit) { return { needPull: true, needPush: false, remoteCommit: head.id }; } else { return { needPull: true, // More a could need pull needPush: true, remoteCommit: head.id }; } } } } } async PushObject({ id, type, data }: PushObjectRequest): Promise { if (type in ObjectTypes) { let newID = await this.repo.writeObject(data, type as any) if (newID !== id) //TODO: Maybe cleanup wrong object? throw new Error("Invalid ID!"); return { success: true } } else { throw new Error("Invalid type!"); } } } export async function startSyncRemote(stream: ISimpleStream, repo: Repository) { return new Promise((yes, no) => { const prov = new ServiceProviderServer(); prov.addService(new SyncRemote(repo)); const sess = prov.getSession(stream.send.bind(stream)); stream.on("data", (data) => sess.onMessage(data).catch(no)); stream.on("close", () => { sess.close(); yes() }); }) }