Skip to content

Commit bf593ab

Browse files
committed
feat: 🎸 start synchronization when connection appears
1 parent bb3dc86 commit bf593ab

File tree

3 files changed

+90
-21
lines changed

3 files changed

+90
-21
lines changed

src/json-crdt-repo/local/server-crud/ServerCrudLocalHistory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export class ServerCrudLocalHistory implements LocalHistory {
3131
const remote = (async () => {
3232
await this.sync.markDirty(collection, id);
3333
// TODO: use pushNewBlock instead?
34-
const success = await this.sync.push(collection, id);
34+
const success = await this.sync.sync(collection, id);
3535
if (!success) throw new Error('NOT_SYNCED');
3636
})();
3737
remote.catch(() => {});

src/json-crdt-repo/local/server-crud/ServerCrudLocalHistorySync.ts

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import {timeout} from 'thingies/lib/timeout';
2+
import {SESSION} from 'json-joy/lib/json-crdt-patch/constants';
3+
import {ts} from 'json-joy/lib/json-crdt-patch';
4+
import {once} from 'thingies';
25
import type {RemoteBlockPatch} from '../../remote/types';
36
import type {ServerCrudLocalHistoryCore} from './ServerCrudLocalHistoryCore';
47
import type {BlockSyncMetadata} from './types';
5-
import {SESSION} from 'json-joy/lib/json-crdt-patch/constants';
6-
import {ts} from 'json-joy/lib/json-crdt-patch';
8+
import type {Subscription} from 'rxjs';
79

810
const SYNC_FILE_NAME = 'sync.cbor';
911

@@ -25,29 +27,38 @@ export interface ServerCrudLocalHistorySyncOpts {
2527
}
2628

2729
export class ServerCrudLocalHistorySync {
28-
private syncLoopTimer: any = 0;
30+
// private syncLoopTimer: any = 0;
31+
private _conSub: Subscription | undefined = undefined;
2932

3033
constructor(
3134
protected readonly opts: ServerCrudLocalHistorySyncOpts,
3235
protected readonly core: ServerCrudLocalHistoryCore,
3336
) {}
3437

38+
@once
3539
public start(): void {
36-
40+
this._conSub = this.core.connected$.subscribe((connected) => {
41+
if (connected) {
42+
this.syncAll().catch(() => {});
43+
} else {
44+
45+
}
46+
});
3747
}
3848

49+
@once
3950
public stop(): void {
40-
51+
this._conSub?.unsubscribe();
4152
}
4253

4354
protected remoteTimeout(): number {
4455
return this.opts.remoteTimeout ?? 5000;
4556
}
4657

47-
public async push(collection: string[], id: string): Promise<boolean> {
48-
return await this.lock<boolean>({collection, id}, async () => {
49-
// TODO: handle case when this times out, but actually succeeds, so on re-sync it handles the case when the block is already synced.
58+
public async sync(collection: string[], id: string): Promise<boolean> {
59+
return await this.lockItemSync<boolean>({collection, id}, async () => {
5060
try {
61+
// TODO: handle case when this times out, but actually succeeds, so on re-sync it handles the case when the block is already synced.
5162
return timeout(this.remoteTimeout(), async () => {
5263
const core = this.core;
5364
if (!core.connected$.getValue()) return false;
@@ -86,7 +97,10 @@ export class ServerCrudLocalHistorySync {
8697
});
8798
}
8899

89-
public async lock<T>(
100+
/**
101+
* Locks a specific item for synchronization.
102+
*/
103+
private async lockItemSync<T>(
90104
{
91105
collection,
92106
id,
@@ -141,28 +155,42 @@ export class ServerCrudLocalHistorySync {
141155
}
142156
}
143157

144-
protected async * listDirty(collection: string[] = ['sync', 'dirty']): AsyncIterableIterator<{collection: string[]; id: string}> {
158+
protected async * listDirty(collection: string[] = ['sync', 'dirty']): AsyncIterableIterator<ItemId> {
145159
for await (const entry of this.core.crud.scan(collection)) {
146160
if (entry.type === 'collection') yield* this.listDirty([...collection, entry.id]);
147161
else yield {collection, id: entry.id};
148162
}
149163
}
150164

151-
protected async * syncDirty(): AsyncIterableIterator<[block: {collection: string[]; id: string}, success: boolean]> {
165+
protected async * syncDirty(): AsyncIterableIterator<SyncResult> {
152166
for await (const block of this.listDirty()) {
153-
const {collection, id} = block;
154-
const success = await this.push(collection, id);
155-
yield [block, success];
167+
const {collection: [_sync, _dirty, ...collection], id} = block;
168+
try {
169+
const success = await this.sync(collection, id);
170+
yield [block, success];
171+
} catch (error) {
172+
yield [block, false, error];
173+
}
156174
}
157175
}
158176

159-
protected async syncAllDirty(): Promise<SyncResultList> {
177+
public async syncAll(): Promise<SyncResultList> {
178+
const locks = this.core.locks;
179+
if (locks.isLocked('sync')) return [];
160180
const list: SyncResultList = [];
161-
for await (const result of this.syncDirty()) list.push(result);
162-
return list;
181+
const duration = 30000;
182+
const start = Date.now();
183+
return await locks.lock('sync', duration, 3000)(async () => {
184+
for await (const result of this.syncDirty()) {
185+
list.push(result);
186+
const now = Date.now();
187+
if (now - start + 100 > duration) break;
188+
}
189+
return list;
190+
});
163191
}
164192
}
165193

166194
export type ItemId = {collection: string[], id: string};
167-
export type SyncResult = [block: ItemId, success: boolean];
195+
export type SyncResult = [block: ItemId, success: boolean, err?: Error | unknown];
168196
export type SyncResultList = SyncResult[];

src/json-crdt-repo/local/server-crud/__tests__/ServerCrudLocalHistory.spec.ts

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {Model, nodes, s} from 'json-joy/lib/json-crdt';
77
import {Log} from 'json-joy/lib/json-crdt/log/Log';
88
import {BehaviorSubject} from 'rxjs';
99
import {setup as remoteSetup} from '../../../remote/__tests__/setup';
10-
import {tick} from 'thingies';
10+
import {tick, until} from 'thingies';
1111
import {SESSION} from 'json-joy/lib/json-crdt-patch/constants';
1212

1313
const setup = async (
@@ -30,11 +30,15 @@ const setup = async (
3030
connected$: new BehaviorSubject(true),
3131
...opts.local,
3232
});
33+
local.sync.start();
3334
const log = Log.fromNewModel(Model.withLogicalClock(sid));
3435
log.end.api.root({foo: 'bar'});
3536
log.end.api.flush();
3637
const genId = () => Date.now().toString(36) + Math.random().toString(36).slice(2);
3738
const id = genId();
39+
const stop = () => {
40+
local.sync.stop();
41+
};
3842
return {
3943
remote,
4044
fs,
@@ -47,6 +51,7 @@ const setup = async (
4751
log,
4852
genId,
4953
id,
54+
stop,
5055
};
5156
};
5257

@@ -171,6 +176,41 @@ describe('.create()', () => {
171176
});
172177
});
173178

179+
describe('when not connected, but connection resumes', () => {
180+
const setupNotConnected = async () => {
181+
const connected$ = new BehaviorSubject(false);
182+
const deps = await setup({
183+
local: {
184+
connected$,
185+
sync: {
186+
remoteTimeout: 100,
187+
},
188+
},
189+
});
190+
return {
191+
...deps,
192+
connected$,
193+
};
194+
};
195+
196+
test('does not store the block on remote at first, synchronizes it when connection resumes', async () => {
197+
const kit = await setupNotConnected();
198+
const res = await kit.local.create(['my', 'col'], kit.log, kit.id);
199+
expect(kit.remote.services.blocks.stats().blocks).toBe(0);
200+
try {
201+
await res.remote;
202+
throw 'not this error';
203+
} catch (error) {
204+
expect(error).toEqual(new Error('NOT_SYNCED'));
205+
}
206+
await tick(50);
207+
expect(kit.remote.services.blocks.stats().blocks).toBe(0);
208+
kit.connected$.next(true);
209+
await until(() => kit.remote.services.blocks.stats().blocks === 1);
210+
expect(kit.remote.services.blocks.stats().blocks).toBe(1);
211+
});
212+
});
213+
174214
describe('when remote call fails', () => {
175215
const setupFaultyConnection = () => {
176216
const remote = remoteSetup();
@@ -364,7 +404,8 @@ describe('.create()', () => {
364404
const isDirty = await kit.local.sync.isDirty(['my', 'col'], kit.id);
365405
expect(isDirty).toBe(true);
366406
await tick(200);
367-
expect(isDirty).toBe(false);
407+
const isDirty2 = await kit.local.sync.isDirty(['my', 'col'], kit.id);
408+
expect(isDirty2).toBe(false);
368409
});
369410
});
370411
});

0 commit comments

Comments
 (0)