Skip to content

Commit ee50fa4

Browse files
committed
feat: 🎸 implement background sync loop
1 parent 7b6979f commit ee50fa4

File tree

3 files changed

+99
-69
lines changed

3 files changed

+99
-69
lines changed

src/json-crdt-repo/__tests__/testbed.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export class BrowserTabTestbed {
7171
onSyncError: (error) => console.error(error),
7272
...opts,
7373
});
74+
repo.start();
7475
return new LocalRepoTestbed(this, repo);
7576
}
7677

src/json-crdt-repo/local/level/LevelLocalRepo.ts

Lines changed: 85 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import type {
3636
SyncResult,
3737
LevelLocalRepoPubSub,
3838
LevelLocalRepoCursor,
39+
BlockSyncRecord,
3940
} from './types';
4041
import type {CrudLocalRepoCipher} from './types';
4142
import type {Locks} from 'thingies/lib/Locks';
@@ -61,7 +62,7 @@ const enum Defaults {
6162
* The root of the key-space where items are marked as "dirty" and need sync.
6263
*
6364
* ```
64-
* s!<collection>!<id>
65+
* s!b!<collection>!<id>
6566
* ```
6667
*/
6768
SyncRoot = 's',
@@ -201,15 +202,14 @@ export class LevelLocalRepo implements LocalRepo {
201202
@once
202203
public start(): void {
203204
this._conSub = this.connected$.subscribe((connected) => {
204-
if (connected) {
205-
this.syncAll().catch(() => {});
206-
} else {
207-
}
205+
if (!this._remoteSyncLoopActive) this.runRemoteSyncLoop();
208206
});
209207
}
210208

211209
@once
212210
public stop(): void {
211+
this._remoteSyncLoopActive = false;
212+
clearTimeout(this._remoteSyncDelayTimer as any);
213213
this._stopped = true;
214214
this._conSub?.unsubscribe();
215215
}
@@ -270,6 +270,10 @@ export class LevelLocalRepo implements LocalRepo {
270270
return this.snapshotKeyBase(blockKeyBase) + seqFormatted;
271271
}
272272

273+
protected syncKey(keyBase: string): string {
274+
return Defaults.SyncRoot + '!' + keyBase;
275+
}
276+
273277
protected async _exists(keyBase: string): Promise<boolean> {
274278
const metaKey = keyBase + Defaults.Metadata;
275279
const exists = (await this.kv.keys({gte: metaKey, lte: metaKey, limit: 1}).all()).length > 0;
@@ -404,22 +408,33 @@ export class LevelLocalRepo implements LocalRepo {
404408
return await this.locks.lock(keyBase, 500, 500)<T>(fn);
405409
}
406410

407-
// ---------------------------------------------------------- Synchronization
411+
protected async lockBlockForSync<T>(keyBase: string, fn: () => Promise<T>): Promise<T> {
412+
const key = 's!' + keyBase;
413+
return await this.locks.lock(key, 2000, 3000)<T>(fn);
414+
}
415+
416+
protected isBlockLockedForSync(keyBase: string): boolean {
417+
const key = 's!' + keyBase;
418+
return this.locks.isLocked(key);
419+
}
420+
421+
// --------------------------------------------------- Remote synchronization
408422

409-
protected async markDirty(id: BlockId): Promise<void> {
410-
const key = Defaults.SyncRoot + '!' + id.join('!');
411-
const blob = this.codec.encoder.encode(Date.now());
423+
protected async markDirty(keyBase: string, id: BlockId): Promise<void> {
424+
const key = this.syncKey(keyBase);
425+
const record: BlockSyncRecord = [id, Date.now()];
426+
const blob = await this.encode(record, false);
412427
await this.kv.put(key, blob);
413428
}
414429

415-
protected async markDirtyAndSync(id: BlockId): Promise<boolean> {
430+
protected async markDirtyAndSync(keyBase: string, id: BlockId): Promise<boolean> {
416431
if (this._stopped) return false;
417-
this.markDirty(id).catch((error) => {
432+
this.markDirty(keyBase, id).catch((error) => {
418433
if (this._stopped) return;
419434
this.opts.onSyncError?.(error);
420435
});
421436
try {
422-
return await this.push(id, true);
437+
return await this.push(keyBase, id, true);
423438
} catch (error) {
424439
if (typeof error === 'object' && error && (error as any).message === 'Not Found') return false;
425440
else if (error instanceof Error && error.message === 'DISCONNECTED') return false;
@@ -436,15 +451,13 @@ export class LevelLocalRepo implements LocalRepo {
436451
* @param id Block ID.
437452
* @param pull Whether to pull if there are no patches to push.
438453
*/
439-
protected async push(id: BlockId, doPull: boolean = false): Promise<boolean> {
454+
protected async push(keyBase: string, id: BlockId, doPull: boolean = false): Promise<boolean> {
440455
if (this._stopped) return false;
441456
if (!this.connected$.getValue()) throw new Error('DISCONNECTED');
442-
const keyBase = await this.blockKeyBase(id);
443457
const remote = this.opts.rpc;
444458
const remoteId = id.join('/');
445459
const patches: ServerPatch[] = [];
446-
const syncMarkerKey = Defaults.SyncRoot + '!' + id.join('!');
447-
const ops: BinStrLevelOperation[] = [{type: 'del', key: syncMarkerKey}];
460+
const ops: BinStrLevelOperation[] = [{type: 'del', key: this.syncKey(keyBase)}];
448461
for await (const [key, blob] of this.readFrontierBlobs0(keyBase)) {
449462
ops.push({type: 'del', key});
450463
patches.push({blob});
@@ -548,61 +561,65 @@ export class LevelLocalRepo implements LocalRepo {
548561
});
549562
}
550563

551-
public async isDirty(collection: string[], id: string): Promise<boolean> {
552-
throw new Error('not implemented');
553-
// const dir = ['sync', 'dirty', ...collection];
554-
// try {
555-
// await this.core.crud.info(dir, id);
556-
// return true;
557-
// } catch (error) {
558-
// if (error instanceof DOMException && error.name === 'ResourceNotFound') return false;
559-
// throw error;
560-
// }
564+
/**
565+
* Iterates over all blocks marked as dirty.
566+
*/
567+
protected async *listDirty(): AsyncIterableIterator<BlockSyncRecord> {
568+
const gt: string = Defaults.SyncRoot;
569+
const lt: string = Defaults.SyncRoot + '~';
570+
for await (const blob of this.kv.values({gt, lt}))
571+
yield await this.decode(blob, false) as BlockSyncRecord;
561572
}
562573

563-
protected async *listDirty(collection: string[] = ['sync', 'dirty']): AsyncIterableIterator<BlockId> {
564-
throw new Error('not implemented');
565-
// for await (const entry of this.core.crud.scan(collection)) {
566-
// if (entry.type === 'collection') yield* this.listDirty([...collection, entry.id]);
567-
// else yield {collection, id: entry.id};
568-
// }
574+
public async remoteSyncAll(): Promise<SyncResult[]> {
575+
const resultList: SyncResult[] = [];
576+
for await (const record of this.listDirty()) {
577+
if (this._stopped) return resultList;
578+
if (!this._remoteSyncLoopActive) return resultList;
579+
if (!this.connected$.getValue()) return resultList;
580+
const [id] = record;
581+
const keyBase = await this.blockKeyBase(id);
582+
const isLocked = this.isBlockLockedForSync(keyBase);
583+
if (isLocked) continue;
584+
await this.lockBlockForSync(keyBase, async () => {
585+
let error: unknown;
586+
let success: boolean = false;
587+
try {
588+
success = await this.push(keyBase, id, true);
589+
} catch (err) {
590+
error = err;
591+
}
592+
const result: SyncResult = [id, success, error];
593+
resultList.push(result);
594+
});
595+
}
596+
return resultList;
569597
}
570598

571-
protected async *syncDirty(): AsyncIterableIterator<SyncResult> {
572-
// for await (const block of this.listDirty()) {
573-
// const {
574-
// collection: [_sync, _dirty, ...collection],
575-
// id,
576-
// } = block;
577-
// try {
578-
// const success = await this.sync(collection, id);
579-
// yield [block, success];
580-
// } catch (error) {
581-
// yield [block, false, error];
582-
// }
583-
// }
584-
}
599+
protected _remoteSyncLoopActive: boolean = false;
600+
protected _remoteSyncDelayTimer: unknown;
585601

586-
public async syncAll(): Promise<SyncResult[]> {
587-
throw new Error('not implemented');
588-
// const locks = this.locks;
589-
// if (locks.isLocked('sync')) return [];
590-
// const list: SyncResultList = [];
591-
// const duration = 30000;
592-
// const start = Date.now();
593-
// return await locks.lock(
594-
// 'sync',
595-
// duration,
596-
// 3000,
597-
// )(async () => {
598-
// for await (const result of this.syncDirty()) {
599-
// if (!this.core.connected$.getValue()) return [];
600-
// list.push(result);
601-
// const now = Date.now();
602-
// if (now - start + 100 > duration) break;
603-
// }
604-
// return list;
605-
// });
602+
protected runRemoteSyncLoop(): void {
603+
this._remoteSyncLoopActive = true;
604+
if (!this.connected$.getValue()) {
605+
this._remoteSyncLoopActive = false;
606+
return;
607+
}
608+
this.remoteSyncAll()
609+
.catch((error) => {
610+
this.opts.onSyncError?.(error);
611+
})
612+
.finally(() => {
613+
if (!this._remoteSyncLoopActive) return;
614+
if (!this.connected$.getValue()) {
615+
this._remoteSyncLoopActive = false;
616+
return;
617+
}
618+
this._remoteSyncDelayTimer = setTimeout(() => {
619+
if (!this._remoteSyncLoopActive) return;
620+
this.runRemoteSyncLoop();
621+
}, 1000);
622+
});
606623
}
607624

608625
/** ----------------------------------------------------- {@link LocalRepo} */
@@ -635,7 +652,7 @@ export class LevelLocalRepo implements LocalRepo {
635652
if (exists) throw new Error('EXISTS');
636653
await this.kv.batch(ops);
637654
});
638-
const remote = this.markDirtyAndSync(id)
655+
const remote = this.markDirtyAndSync(keyBase, id)
639656
.then(() => {})
640657
.catch((error) => {
641658
if (this._stopped) return;
@@ -775,7 +792,7 @@ export class LevelLocalRepo implements LocalRepo {
775792
const merge = await this.readFrontier0(keyBase);
776793
return {cursor, merge};
777794
}
778-
const remote = this.markDirtyAndSync(id)
795+
const remote = this.markDirtyAndSync(keyBase, id)
779796
.then(() => {})
780797
.catch((error) => {
781798
if (this._stopped) return;

src/json-crdt-repo/local/level/types.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,19 @@ export interface CrudLocalRepoCipher {
3939
decrypt(ciphertext: Uint8Array): Promise<Uint8Array>;
4040
}
4141

42-
export type SyncResult = [block: BlockId, success: boolean, err?: Error | unknown];
42+
export type BlockSyncRecord = [
43+
/**
44+
* Unencrypted block ID.
45+
*/
46+
id: BlockId,
47+
48+
/**
49+
* Time when block was marked as dirty.
50+
*/
51+
ts: number,
52+
];
53+
54+
export type SyncResult = [block: BlockId, success: boolean, err?: Error | unknown | undefined];
4355

4456
export type LocalBatch = ServerBatch;
4557
export type LocalSnapshot = ServerSnapshot;

0 commit comments

Comments
 (0)