SimpleVSC/src/sync.ts

356 lines
11 KiB
TypeScript

import RemoteCommit from "./com/commit";
import PullGetLogRequest from "./com/pullgetlogrequest";
import PullGetLogResponse from "./com/pullgetlogresponse";
import { ServiceProvider as ServiceProviderClient } from "./com/service_client";
import { ServiceProvider as ServiceProviderServer } from "./com/service_server";
import { PullObjectRequest, PullObjectResponse, PushObjectRequest, PushObjectResponse, Sync as SyncClient, SyncInitRequest, SyncInitResponse } from "./com/sync_client";
import { Sync as SyncServer } from "./com/sync_server";
import Repository, { Commit, ObjectTypes, TreeEntry } from "./repo";
if (typeof btoa === 'undefined') {
global.btoa = function (str) {
return Buffer.from(str, 'binary').toString('base64');
};
}
if (typeof atob === 'undefined') {
global.atob = function (b64Encoded) {
return Buffer.from(b64Encoded, 'base64').toString('binary');
};
}
export interface ISimpleStream {
on(type: "data", cb: (data: Uint8Array) => void): void;
on(type: "close", cb: () => void): void;
close(): void;
send(data: Uint8Array): void;
}
export function getCopyStreams(): [ISimpleStream, ISimpleStream] {
let cb1d: any = undefined;
let cb1c: any = undefined;
let cb2d: any = undefined;
let cb2c: any = undefined;
let s1: ISimpleStream = {
on: (type, cb) => {
if (type == "data") {
cb1d = cb;
} else if (type == "close") {
cb1c = cb;
}
},
close: () => {
if (cb2c)
cb2c();
},
send: data => {
if (cb2d)
cb2d(data);
}
}
let s2: ISimpleStream = {
on: (type, cb) => {
if (type == "data") {
cb2d = cb;
} else if (type == "close") {
cb2c = cb;
}
},
close: () => {
if (cb2c)
cb1c();
},
send: data => {
if (cb1d)
cb1d(data);
}
}
return [s1, s2];
}
export async function startSyncClient(stream: ISimpleStream, repo: Repository) {
const prov = new ServiceProviderClient(stream.send.bind(stream));
stream.on("data", chunk => prov.onPacket(chunk));
stream.on("close", () => { }) //TODO: Implement
const service = new SyncClient(prov);
let head = await repo.readHead();
let remoteContainsHead = false;
let syncResponse = await service.SyncInit({
clientCommit: head?.id as string
})
if (syncResponse.needPull) {
let commitsToDownload = new Set<string>();
let toAnalyse: string[] = [syncResponse.remoteCommit];
while (toAnalyse.length > 0) {
let current = toAnalyse.pop() as string;
let resp = await service.PullGetLog({
limit: 20,
start: current
});
for (const commit of resp.commits) {
if (head && commit.id == head.id) {
remoteContainsHead = true;
}
if (!await repo.hasCommit(commit.id)) {
commitsToDownload.add(commit.id);
if (commit.merge && !await repo.hasCommit(commit.merge)) {
commitsToDownload.add(commit.merge);
toAnalyse.push(commit.merge);
}
}
}
let last = resp.commits[resp.commits.length - 1];
if (last && last.before) {
toAnalyse.push(last.before);
}
}
const downloadTree = async (treeID: string) => {
let treeData = await service.PullObject({ id: treeID, type: "tree" });
if (!treeData.found)
throw new Error(`Tree with id ${treeID} not found!`);
let tree = repo.parseTree(treeData.data);
for (const [type, objID, name] of tree) {
if (!await repo.hasObject(objID)) {
if (type == "tree") {
await downloadTree(objID);
}
let res = await service.PullObject({
id: objID,
type
});
if (!res.found)
throw new Error(`Could not find Object with id ${objID} on remote`);
repo.writeObject(res.data, type);
}
}
}
for (const commitID of commitsToDownload) {
let commitData = await service.PullObject({ id: commitID, type: "comm" });
if (!commitData.found)
throw new Error(`Commit with id ${commitID} not on server!`);
let commit = repo.parseCommit(commitID, commitData.data);
if (!await repo.hasObject(commit.root)) {
await downloadTree(commit.root);
let res = await service.PullObject({
id: commit.root,
type: "tree"
});
if (!res.found)
throw new Error(`Could not find Object with id ${commit.root} on remote`);
repo.writeObject(res.data, "tree");
}
repo.writeObject(commitData.data, "comm");
}
if (head) {
if (remoteContainsHead) {
await repo.writeHead(syncResponse.remoteCommit);
} else {
let commit = await repo.mergeCommits(head.id, syncResponse.remoteCommit);
await repo.writeHead(commit);
}
} else if (syncResponse.remoteCommit) {
await repo.writeHead(syncResponse.remoteCommit);
}
}
if (syncResponse.needPush) {
//TODO: Push
// Steps:
// 1. Figure out missing commits
// 2. Push missing Objects of these commits
// 3. Push missing commits
let commitsToPush = new Set<string>();
let objectsToPush = new Set<string>();
let localHead = await repo.readHead()
if (localHead) { // If there is nothing to push, don't push
let toCheck: Commit[] = [localHead];
let commit: Commit;
while(toCheck.length > 0) {
commit = toCheck.shift() as Commit;
if(commit.id != syncResponse.remoteCommit) { //Not yet at remote commit. Wait
commitsToPush.add(commit.id);
if(commit.before && !commitsToPush.has(commit.before)) // A commit could already be checked by the other part of a merge
toCheck.push(await repo.readCommit(commit.before));
if(commit.merge && !commitsToPush.has(commit.merge))
toCheck.push(await repo.readCommit(commit.merge));
}
}
const traverseTree =async (current: TreeEntry[]) => {
for(const [type, hash, name] of current) {
objectsToPush.add(hash);
if(type =="tree") {
let tree = await repo.readTree(hash);
await traverseTree(tree);
}
}
}
for(const commitId of commitsToPush) {
let commit = await repo.readCommit(commitId);
let rootId = commit.root
objectsToPush.add(rootId);
if(objectsToPush.has(rootId)) {
objectsToPush.add(rootId);
let root = await repo.readTree(rootId);
await traverseTree(root);
}
}
}
for(const objId of objectsToPush) {
let obj = await repo.readObjectTyped(objId)
await service.PushObject({
id: objId,
data: obj?.data as Uint8Array,
type: obj?.type as string
});
}
}
return stream.close();
}
class SyncRemote extends SyncServer<never> {
repo: Repository;
constructor(repo: Repository) {
super();
this.repo = repo;
}
async PullGetLog({ limit, start }: PullGetLogRequest): Promise<PullGetLogResponse> {
let log = await this.repo.readCommitLog();
let startIdx = 0;
if (start) {
startIdx = log.findIndex(e => e.id == start);
if (startIdx < 0) {
throw new Error("Invalid start commit ID!");
}
}
return {
commits: log.slice(startIdx, startIdx + limit).map(({ id, root, before, merge, date, }) => ({ id, root, before, merge, date: date.valueOf() }))
}
}
async PullObject({ id, type }: PullObjectRequest): Promise<PullObjectResponse> {
let res = await this.repo.readObjectTyped(id);
if (!res || res.type !== type) {
return {
found: false,
data: undefined as any
}
} else {
return {
found: true,
data: res.data
}
}
}
async SyncInit({ clientCommit }: SyncInitRequest): Promise<SyncInitResponse> {
let head = await this.repo.readHead();
if (!head) {
if (!clientCommit) {
return {
needPull: false,
needPush: false,
remoteCommit: undefined as never as string
};
} else {
return {
needPull: false,
needPush: true,
remoteCommit: undefined as never as string
}
}
} else {
if (head.id == clientCommit) {
return {
needPull: false,
needPush: false,
remoteCommit: head.id
};
} else {
if (!clientCommit) {
return {
needPull: true,
needPush: false,
remoteCommit: head.id
};
} else {
let log = await this.repo.readCommitLog(clientCommit);
if (log[log.length - 1].id == clientCommit) {
return {
needPull: true,
needPush: false,
remoteCommit: head.id
};
} else {
return {
needPull: true, // More a could need pull
needPush: true,
remoteCommit: head.id
};
}
}
}
}
}
async PushObject({ id, type, data }: PushObjectRequest): Promise<PushObjectResponse> {
if (type in ObjectTypes) {
let newID = await this.repo.writeObject(data, type as any)
if (newID !== id) //TODO: Maybe cleanup wrong object?
throw new Error("Invalid ID!");
return {
success: true
}
} else {
throw new Error("Invalid type!");
}
}
}
export async function startSyncRemote(stream: ISimpleStream, repo: Repository) {
return new Promise<void>((yes, no) => {
const prov = new ServiceProviderServer();
prov.addService(new SyncRemote(repo));
const sess = prov.getSession(stream.send.bind(stream));
stream.on("data", (data) => sess.onMessage(data).catch(no));
stream.on("close", () => { sess.close(); yes() });
})
}