356 lines
11 KiB
TypeScript
356 lines
11 KiB
TypeScript
|
|
import RemoteCommit from "./com/commit";
|
|
import PullGetLogRequest from "./com/pullgetlogrequest";
|
|
import PullGetLogResponse from "./com/pullgetlogresponse";
|
|
import { ServiceProvider as ServiceProviderClient } from "./com/service_client";
|
|
import { ServiceProvider as ServiceProviderServer } from "./com/service_server";
|
|
import { PullObjectRequest, PullObjectResponse, PushObjectRequest, PushObjectResponse, Sync as SyncClient, SyncInitRequest, SyncInitResponse } from "./com/sync_client";
|
|
import { Sync as SyncServer } from "./com/sync_server";
|
|
import Repository, { Commit, ObjectTypes, TreeEntry } from "./repo";
|
|
|
|
if (typeof btoa === 'undefined') {
|
|
global.btoa = function (str) {
|
|
return Buffer.from(str, 'binary').toString('base64');
|
|
};
|
|
}
|
|
|
|
if (typeof atob === 'undefined') {
|
|
global.atob = function (b64Encoded) {
|
|
return Buffer.from(b64Encoded, 'base64').toString('binary');
|
|
};
|
|
}
|
|
|
|
export interface ISimpleStream {
|
|
on(type: "data", cb: (data: Uint8Array) => void): void;
|
|
on(type: "close", cb: () => void): void;
|
|
close(): void;
|
|
send(data: Uint8Array): void;
|
|
}
|
|
|
|
export function getCopyStreams(): [ISimpleStream, ISimpleStream] {
|
|
let cb1d: any = undefined;
|
|
let cb1c: any = undefined;
|
|
|
|
let cb2d: any = undefined;
|
|
let cb2c: any = undefined;
|
|
|
|
let s1: ISimpleStream = {
|
|
on: (type, cb) => {
|
|
if (type == "data") {
|
|
cb1d = cb;
|
|
} else if (type == "close") {
|
|
cb1c = cb;
|
|
}
|
|
},
|
|
close: () => {
|
|
if (cb2c)
|
|
cb2c();
|
|
},
|
|
send: data => {
|
|
if (cb2d)
|
|
cb2d(data);
|
|
}
|
|
}
|
|
|
|
let s2: ISimpleStream = {
|
|
on: (type, cb) => {
|
|
if (type == "data") {
|
|
cb2d = cb;
|
|
} else if (type == "close") {
|
|
cb2c = cb;
|
|
}
|
|
},
|
|
close: () => {
|
|
if (cb2c)
|
|
cb1c();
|
|
},
|
|
send: data => {
|
|
if (cb1d)
|
|
cb1d(data);
|
|
}
|
|
}
|
|
|
|
return [s1, s2];
|
|
}
|
|
|
|
|
|
export async function startSyncClient(stream: ISimpleStream, repo: Repository) {
|
|
const prov = new ServiceProviderClient(stream.send.bind(stream));
|
|
stream.on("data", chunk => prov.onPacket(chunk));
|
|
|
|
stream.on("close", () => { }) //TODO: Implement
|
|
|
|
const service = new SyncClient(prov);
|
|
let head = await repo.readHead();
|
|
let remoteContainsHead = false;
|
|
|
|
let syncResponse = await service.SyncInit({
|
|
clientCommit: head?.id as string
|
|
})
|
|
|
|
if (syncResponse.needPull) {
|
|
let commitsToDownload = new Set<string>();
|
|
let toAnalyse: string[] = [syncResponse.remoteCommit];
|
|
while (toAnalyse.length > 0) {
|
|
let current = toAnalyse.pop() as string;
|
|
|
|
let resp = await service.PullGetLog({
|
|
limit: 20,
|
|
start: current
|
|
});
|
|
|
|
for (const commit of resp.commits) {
|
|
if (head && commit.id == head.id) {
|
|
remoteContainsHead = true;
|
|
}
|
|
if (!await repo.hasCommit(commit.id)) {
|
|
commitsToDownload.add(commit.id);
|
|
if (commit.merge && !await repo.hasCommit(commit.merge)) {
|
|
commitsToDownload.add(commit.merge);
|
|
toAnalyse.push(commit.merge);
|
|
}
|
|
}
|
|
}
|
|
|
|
let last = resp.commits[resp.commits.length - 1];
|
|
if (last && last.before) {
|
|
toAnalyse.push(last.before);
|
|
}
|
|
}
|
|
|
|
const downloadTree = async (treeID: string) => {
|
|
let treeData = await service.PullObject({ id: treeID, type: "tree" });
|
|
if (!treeData.found)
|
|
throw new Error(`Tree with id ${treeID} not found!`);
|
|
|
|
let tree = repo.parseTree(treeData.data);
|
|
for (const [type, objID, name] of tree) {
|
|
if (!await repo.hasObject(objID)) {
|
|
if (type == "tree") {
|
|
await downloadTree(objID);
|
|
}
|
|
|
|
let res = await service.PullObject({
|
|
id: objID,
|
|
type
|
|
});
|
|
|
|
if (!res.found)
|
|
throw new Error(`Could not find Object with id ${objID} on remote`);
|
|
|
|
repo.writeObject(res.data, type);
|
|
}
|
|
}
|
|
}
|
|
|
|
for (const commitID of commitsToDownload) {
|
|
let commitData = await service.PullObject({ id: commitID, type: "comm" });
|
|
if (!commitData.found)
|
|
throw new Error(`Commit with id ${commitID} not on server!`);
|
|
|
|
let commit = repo.parseCommit(commitID, commitData.data);
|
|
|
|
if (!await repo.hasObject(commit.root)) {
|
|
await downloadTree(commit.root);
|
|
let res = await service.PullObject({
|
|
id: commit.root,
|
|
type: "tree"
|
|
});
|
|
|
|
if (!res.found)
|
|
throw new Error(`Could not find Object with id ${commit.root} on remote`);
|
|
|
|
repo.writeObject(res.data, "tree");
|
|
}
|
|
|
|
repo.writeObject(commitData.data, "comm");
|
|
}
|
|
|
|
if (head) {
|
|
if (remoteContainsHead) {
|
|
await repo.writeHead(syncResponse.remoteCommit);
|
|
} else {
|
|
let commit = await repo.mergeCommits(head.id, syncResponse.remoteCommit);
|
|
await repo.writeHead(commit);
|
|
}
|
|
} else if (syncResponse.remoteCommit) {
|
|
await repo.writeHead(syncResponse.remoteCommit);
|
|
}
|
|
}
|
|
|
|
if (syncResponse.needPush) {
|
|
//TODO: Push
|
|
// Steps:
|
|
// 1. Figure out missing commits
|
|
// 2. Push missing Objects of these commits
|
|
// 3. Push missing commits
|
|
let commitsToPush = new Set<string>();
|
|
let objectsToPush = new Set<string>();
|
|
|
|
let localHead = await repo.readHead()
|
|
if (localHead) { // If there is nothing to push, don't push
|
|
let toCheck: Commit[] = [localHead];
|
|
|
|
let commit: Commit;
|
|
while(toCheck.length > 0) {
|
|
commit = toCheck.shift() as Commit;
|
|
if(commit.id != syncResponse.remoteCommit) { //Not yet at remote commit. Wait
|
|
commitsToPush.add(commit.id);
|
|
if(commit.before && !commitsToPush.has(commit.before)) // A commit could already be checked by the other part of a merge
|
|
toCheck.push(await repo.readCommit(commit.before));
|
|
|
|
if(commit.merge && !commitsToPush.has(commit.merge))
|
|
toCheck.push(await repo.readCommit(commit.merge));
|
|
}
|
|
}
|
|
|
|
const traverseTree =async (current: TreeEntry[]) => {
|
|
for(const [type, hash, name] of current) {
|
|
objectsToPush.add(hash);
|
|
if(type =="tree") {
|
|
let tree = await repo.readTree(hash);
|
|
await traverseTree(tree);
|
|
}
|
|
}
|
|
}
|
|
|
|
for(const commitId of commitsToPush) {
|
|
let commit = await repo.readCommit(commitId);
|
|
let rootId = commit.root
|
|
|
|
objectsToPush.add(rootId);
|
|
|
|
if(objectsToPush.has(rootId)) {
|
|
objectsToPush.add(rootId);
|
|
let root = await repo.readTree(rootId);
|
|
await traverseTree(root);
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
for(const objId of objectsToPush) {
|
|
let obj = await repo.readObjectTyped(objId)
|
|
await service.PushObject({
|
|
id: objId,
|
|
data: obj?.data as Uint8Array,
|
|
type: obj?.type as string
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
return stream.close();
|
|
}
|
|
|
|
class SyncRemote extends SyncServer<never> {
|
|
repo: Repository;
|
|
constructor(repo: Repository) {
|
|
super();
|
|
this.repo = repo;
|
|
}
|
|
|
|
async PullGetLog({ limit, start }: PullGetLogRequest): Promise<PullGetLogResponse> {
|
|
let log = await this.repo.readCommitLog();
|
|
let startIdx = 0;
|
|
if (start) {
|
|
startIdx = log.findIndex(e => e.id == start);
|
|
if (startIdx < 0) {
|
|
throw new Error("Invalid start commit ID!");
|
|
}
|
|
}
|
|
|
|
return {
|
|
commits: log.slice(startIdx, startIdx + limit).map(({ id, root, before, merge, date, }) => ({ id, root, before, merge, date: date.valueOf() }))
|
|
}
|
|
}
|
|
|
|
async PullObject({ id, type }: PullObjectRequest): Promise<PullObjectResponse> {
|
|
let res = await this.repo.readObjectTyped(id);
|
|
|
|
if (!res || res.type !== type) {
|
|
return {
|
|
found: false,
|
|
data: undefined as any
|
|
}
|
|
} else {
|
|
return {
|
|
found: true,
|
|
data: res.data
|
|
}
|
|
}
|
|
}
|
|
|
|
async SyncInit({ clientCommit }: SyncInitRequest): Promise<SyncInitResponse> {
|
|
let head = await this.repo.readHead();
|
|
if (!head) {
|
|
if (!clientCommit) {
|
|
return {
|
|
needPull: false,
|
|
needPush: false,
|
|
remoteCommit: undefined as never as string
|
|
};
|
|
} else {
|
|
return {
|
|
needPull: false,
|
|
needPush: true,
|
|
remoteCommit: undefined as never as string
|
|
}
|
|
}
|
|
} else {
|
|
if (head.id == clientCommit) {
|
|
return {
|
|
needPull: false,
|
|
needPush: false,
|
|
remoteCommit: head.id
|
|
};
|
|
} else {
|
|
if (!clientCommit) {
|
|
return {
|
|
needPull: true,
|
|
needPush: false,
|
|
remoteCommit: head.id
|
|
};
|
|
} else {
|
|
let log = await this.repo.readCommitLog(clientCommit);
|
|
if (log[log.length - 1].id == clientCommit) {
|
|
return {
|
|
needPull: true,
|
|
needPush: false,
|
|
remoteCommit: head.id
|
|
};
|
|
} else {
|
|
return {
|
|
needPull: true, // More a could need pull
|
|
needPush: true,
|
|
remoteCommit: head.id
|
|
};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async PushObject({ id, type, data }: PushObjectRequest): Promise<PushObjectResponse> {
|
|
if (type in ObjectTypes) {
|
|
let newID = await this.repo.writeObject(data, type as any)
|
|
if (newID !== id) //TODO: Maybe cleanup wrong object?
|
|
throw new Error("Invalid ID!");
|
|
return {
|
|
success: true
|
|
}
|
|
} else {
|
|
throw new Error("Invalid type!");
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function startSyncRemote(stream: ISimpleStream, repo: Repository) {
|
|
return new Promise<void>((yes, no) => {
|
|
const prov = new ServiceProviderServer();
|
|
prov.addService(new SyncRemote(repo));
|
|
const sess = prov.getSession(stream.send.bind(stream));
|
|
stream.on("data", (data) => sess.onMessage(data).catch(no));
|
|
stream.on("close", () => { sess.close(); yes() });
|
|
})
|
|
} |