Starting on one-way sync

This commit is contained in:
user
2021-08-11 17:19:57 +02:00
parent f3f3d250b4
commit 247b7854aa
17 changed files with 2065 additions and 55 deletions

289
src/sync.ts Normal file
View File

@ -0,0 +1,289 @@
import RemoteCommit from "./com/commit";
import PullGetLogRequest from "./com/pullgetlogrequest";
import PullGetLogResponse from "./com/pullgetlogresponse";
import { ServiceProvider as ServiceProviderClient } from "./com/service_client";
import { ServiceProvider as ServiceProviderServer } from "./com/service_server";
import { PullObjectRequest, PullObjectResponse, Sync as SyncClient, SyncInitRequest, SyncInitResponse } from "./com/sync_client";
import { Sync as SyncServer } from "./com/sync_server";
import Repository from "./repo";
if (typeof btoa === 'undefined') {
global.btoa = function (str) {
return Buffer.from(str, 'binary').toString('base64');
};
}
if (typeof atob === 'undefined') {
global.atob = function (b64Encoded) {
return Buffer.from(b64Encoded, 'base64').toString('binary');
};
}
export interface ISimpleStream {
on(type: "data", cb: (data: Uint8Array) => void): void;
on(type: "close", cb: () => void): void;
close(): void;
send(data: Uint8Array): void;
}
export function getCopyStreams(): [ISimpleStream, ISimpleStream] {
let cb1d: any = undefined;
let cb1c: any = undefined;
let cb2d: any = undefined;
let cb2c: any = undefined;
let s1: ISimpleStream = {
on: (type, cb) => {
if (type == "data") {
cb1d = cb;
} else if (type == "close") {
cb1c = cb;
}
},
close: () => {
if (cb2c)
cb2c();
},
send: data => {
if (cb2d)
cb2d(data);
}
}
let s2: ISimpleStream = {
on: (type, cb) => {
if (type == "data") {
cb2d = cb;
} else if (type == "close") {
cb2c = cb;
}
},
close: () => {
if (cb2c)
cb1c();
},
send: data => {
if (cb1d)
cb1d(data);
}
}
return [s1, s2];
}
export async function startSyncClient(stream: ISimpleStream, repo: Repository) {
const prov = new ServiceProviderClient(stream.send.bind(stream));
stream.on("data", chunk => prov.onPacket(chunk));
stream.on("close", () => { }) //TODO: Implement
const service = new SyncClient(prov);
let head = await repo.readHead();
let remoteContainsHead = false;
let response = await service.SyncInit({
clientCommit: head?.id as string
})
if (response.needPull) {
let commitsToDownload = new Set<string>();
let toAnalyse: string[] = [response.remoteCommit];
while (toAnalyse.length > 0) {
let current = toAnalyse.pop() as string;
let resp = await service.PullGetLog({
limit: 20,
start: current
});
for (const commit of resp.commits) {
if (head && commit.id == head.id) {
remoteContainsHead = true;
}
if (!await repo.hasCommit(commit.id)) {
commitsToDownload.add(commit.id);
if (commit.merge && !await repo.hasCommit(commit.merge)) {
commitsToDownload.add(commit.merge);
toAnalyse.push(commit.merge);
}
}
}
let last = resp.commits[resp.commits.length - 1];
if (last && last.before) {
toAnalyse.push(last.before);
}
}
const downloadTree = async (treeID: string) => {
let treeData = await service.PullObject({ id: treeID, type: "tree" });
if (!treeData.found)
throw new Error(`Tree with id ${treeID} not found!`);
let tree = repo.parseTree(treeData.data);
for (const [type, objID, name] of tree) {
if (!await repo.hasObject(objID)) {
if (type == "tree") {
await downloadTree(objID);
}
let res = await service.PullObject({
id: objID,
type
});
if (!res.found)
throw new Error(`Could not find Object with id ${objID} on remote`);
repo.writeObject(res.data, type);
}
}
}
for (const commitID of commitsToDownload) {
let commitData = await service.PullObject({ id: commitID, type: "comm" });
if (!commitData.found)
throw new Error(`Commit with id ${commitID} not on server!`);
let commit = repo.parseCommit(commitID, commitData.data);
if (!await repo.hasObject(commit.root)) {
await downloadTree(commit.root);
let res = await service.PullObject({
id: commit.root,
type: "tree"
});
if (!res.found)
throw new Error(`Could not find Object with id ${commit.root} on remote`);
repo.writeObject(res.data, "tree");
}
repo.writeObject(commitData.data, "comm");
}
if (head) {
if (remoteContainsHead) {
await repo.writeHead(response.remoteCommit);
} else {
let commit = await repo.mergeCommits(head.id, response.remoteCommit);
await repo.writeHead(commit);
}
} else if (response.remoteCommit) {
await repo.writeHead(response.remoteCommit);
}
}
if (response.needPush) {
//TODO: Push
}
return stream.close();
}
class SyncRemote extends SyncServer<never> {
repo: Repository;
constructor(repo: Repository) {
super();
this.repo = repo;
}
async PullGetLog({ limit, start }: PullGetLogRequest): Promise<PullGetLogResponse> {
let log = await this.repo.readCommitLog();
let startIdx = 0;
if (start) {
startIdx = log.findIndex(e => e.id == start);
if (startIdx < 0) {
throw new Error("Invalid start commit ID!");
}
}
return {
commits: log.slice(startIdx, startIdx + limit).map(({ id, root, before, merge, date, }) => ({ id, root, before, merge, date: date.valueOf() }))
}
}
async PullObject({ id, type }: PullObjectRequest): Promise<PullObjectResponse> {
let res = await this.repo.readObjectTyped(id);
if (!res || res.type !== type) {
return {
found: false,
data: undefined as any
}
} else {
return {
found: true,
data: res.data
}
}
}
async SyncInit({ clientCommit }: SyncInitRequest): Promise<SyncInitResponse> {
let head = await this.repo.readHead();
if (!head) {
if (!clientCommit) {
return {
needPull: false,
needPush: false,
remoteCommit: undefined as never as string
};
} else {
return {
needPull: false,
needPush: true,
remoteCommit: undefined as never as string
}
}
} else {
if (head.id == clientCommit) {
return {
needPull: false,
needPush: false,
remoteCommit: head.id
};
} else {
if (!clientCommit) {
return {
needPull: true,
needPush: false,
remoteCommit: head.id
};
} else {
let log = await this.repo.readCommitLog(clientCommit);
if (log[log.length - 1].id == clientCommit) {
return {
needPull: true,
needPush: false,
remoteCommit: head.id
};
} else {
return {
needPull: true, // More a could need pull
needPush: true,
remoteCommit: head.id
};
}
}
}
}
}
}
export async function startSyncRemote(stream: ISimpleStream, repo: Repository) {
return new Promise<void>((yes, no) => {
const prov = new ServiceProviderServer();
prov.addService(new SyncRemote(repo));
const sess = prov.getSession(stream.send.bind(stream));
stream.on("data", (data) => sess.onMessage(data).catch(no));
stream.on("close", () => { sess.close(); yes() });
})
}