Skip to content

Commit 1377676

Browse files
committed
feat: 🎸 improve local repo
1 parent 8984343 commit 1377676

13 files changed

+332
-102
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import {LevelLocalRepo, LevelLocalRepoOpts} from '../local/level/LevelLocalRepo';
2+
import {Locks} from 'thingies/lib/Locks';
3+
import {BehaviorSubject} from 'rxjs';
4+
import {setup as remoteSetup} from '../remote/__tests__/setup';
5+
import {MemoryLevel} from 'memory-level';
6+
import {pubsub as createPubsub} from '../pubsub';
7+
import {BinStrLevel, LevelLocalRepoPubSub} from '../local/level/types';
8+
import {EditSessionFactory} from '../session/EditSessionFactory';
9+
10+
export class Testbed {
11+
public static readonly create = () => {
12+
return new Testbed();
13+
};
14+
15+
public static readonly createRepo = (opts?: Partial<LevelLocalRepoOpts>) => {
16+
return Testbed.create().createBrowser().createTab().createRepo(opts);
17+
};
18+
19+
constructor(
20+
public readonly remote: ReturnType<typeof remoteSetup> = remoteSetup(),
21+
public genId: (() => string) = (() => Date.now().toString(36) + Math.random().toString(36).slice(2)),
22+
) {}
23+
24+
public createBrowser(): BrowserTestbed {
25+
return new BrowserTestbed(this);
26+
}
27+
}
28+
29+
export class BrowserTestbed {
30+
public readonly id: string;
31+
public readonly locks: Locks;
32+
public readonly kv: BinStrLevel;
33+
34+
constructor(
35+
public readonly global: Testbed,
36+
public readonly sid: number = 12345678,
37+
) {
38+
this.id = this.global.genId();
39+
this.locks = new Locks();
40+
this.kv = new MemoryLevel<string, Uint8Array>({
41+
keyEncoding: 'utf8',
42+
valueEncoding: 'view',
43+
}) as unknown as BinStrLevel
44+
}
45+
46+
public createTab(): BrowserTabTestbed {
47+
return new BrowserTabTestbed(this);
48+
}
49+
}
50+
51+
export class BrowserTabTestbed {
52+
public readonly pubsubBusName: string;
53+
public readonly pubsub: LevelLocalRepoPubSub;
54+
55+
constructor (
56+
public readonly browser: BrowserTestbed,
57+
) {
58+
this.pubsubBusName = 'pubsub-bus-' + this.browser.id;
59+
this.pubsub = createPubsub(this.pubsubBusName) as LevelLocalRepoPubSub;
60+
}
61+
62+
public createRepo(opts?: Partial<LevelLocalRepoOpts>): LocalRepoTestbed {
63+
const repo = new LevelLocalRepo({
64+
kv: this.browser.kv,
65+
locks: this.browser.locks,
66+
sid: this.browser.sid,
67+
rpc: this.browser.global.remote.remote,
68+
pubsub: this.pubsub,
69+
connected$: new BehaviorSubject(true),
70+
onSyncError: (error) => console.error(error),
71+
...opts,
72+
});
73+
return new LocalRepoTestbed(this, repo);
74+
}
75+
76+
public readonly stop = async () => {
77+
this.pubsub.end();
78+
};
79+
}
80+
81+
export class LocalRepoTestbed {
82+
public readonly sessions: EditSessionFactory;
83+
public readonly col: string[] = ['collection', 'sub-collection'];
84+
public readonly blockId: string[] = [...this.col, this.tab.browser.id];
85+
86+
constructor (
87+
public readonly tab: BrowserTabTestbed,
88+
public readonly repo: LevelLocalRepo,
89+
) {
90+
this.sessions = new EditSessionFactory({
91+
sid: tab.browser.sid,
92+
repo,
93+
});
94+
}
95+
96+
public readonly stop = async () => {
97+
await this.repo.stop();
98+
};
99+
100+
public readonly stopTab = async () => {
101+
await this.stop();
102+
await this.tab.stop();
103+
};
104+
}

src/json-crdt-repo/local/level/LevelLocalRepo.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ export class LevelLocalRepo implements LocalRepo {
819819
}),
820820
]);
821821
});
822+
await this.opts.rpc.delete?.(id.join('/'));
822823
}
823824

824825
/**

src/json-crdt-repo/local/level/__tests__/LevelLocalRepo.pull.spec.ts

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {Model, Patch, s} from 'json-joy/lib/json-crdt';
22
import {setup} from './setup';
33
import {firstValueFrom, ReplaySubject} from 'rxjs';
44
import {LocalRepo, LocalRepoMergeEvent, LocalRepoResetEvent} from '../../types';
5+
import {until} from 'thingies';
56

67
const get = async (kit: Awaited<ReturnType<typeof setup>>, id = kit.blockId): Promise<Model> => {
78
const {block} = await kit.remote.client.call('block.get', {id: id.join('/')});
@@ -201,54 +202,6 @@ describe('.pull()', () => {
201202
await kit.stop();
202203
});
203204

204-
test('catches up using "merge" strategy', async () => {
205-
const kit = await setup();
206-
const schema = s.obj({foo: s.str('bar')});
207-
const model = Model.create(schema, kit.sid);
208-
const patches = [model.api.flush()];
209-
const local: LocalRepo = kit.local;
210-
const res = await local.sync({id: kit.blockId, patches});
211-
await res.remote;
212-
const model2 = await get(kit);
213-
expect(model2.view()).toEqual({foo: 'bar'});
214-
model2.api.obj([]).set({foo: 'baz'});
215-
await kit.remote.client.call('block.upd', {
216-
id: kit.blockId.join('/'),
217-
batch: {
218-
patches: [{
219-
blob: model2.api.flush().toBinary(),
220-
}],
221-
},
222-
});
223-
model2.api.obj([]).set({x: 1});
224-
await kit.remote.client.call('block.upd', {
225-
id: kit.blockId.join('/'),
226-
batch: {
227-
patches: [{
228-
blob: model2.api.flush().toBinary(),
229-
}],
230-
},
231-
});
232-
const model3 = await get(kit);
233-
expect(model3.view()).toEqual({foo: 'baz', x: 1});
234-
const events$ = new ReplaySubject<LocalRepoMergeEvent>(1);
235-
let cnt = 0;
236-
kit.local.change$(kit.blockId).subscribe((event) => {
237-
if (!(event as LocalRepoMergeEvent).merge) return;
238-
events$.next(event as LocalRepoMergeEvent);
239-
cnt++;
240-
});
241-
await kit.local.pull(kit.blockId);
242-
const event = await firstValueFrom(events$);
243-
expect(cnt).toBe(1);
244-
expect(model.view()).toEqual({foo: 'bar'});
245-
for (const patch of event.merge) model.applyPatch(patch);
246-
expect(model.view()).toEqual({foo: 'baz', x: 1});
247-
const read = await kit.local.get({id: kit.blockId});
248-
expect(read.model.view()).toEqual({foo: 'baz', x: 1});
249-
await kit.stop();
250-
});
251-
252205
test('catches up using "reset" strategy', async () => {
253206
const kit = await setup();
254207
const schema = s.obj({foo: s.str('bar')});

src/json-crdt-repo/local/level/__tests__/setup.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,31 @@ export const setup = async (
4545
};
4646
const {sid, local, pubsub, stop} = createLocal();
4747
local.start();
48+
const createRemote = (localOpts: Partial<LevelLocalRepoOpts> = {}) => {
49+
const sid = localOpts.sid ?? 123456789;
50+
const busName = 'test-' + id;
51+
const pubsub = createPubsub(busName) as LevelLocalRepoPubSub;
52+
const kv = new MemoryLevel<string, Uint8Array>({
53+
keyEncoding: 'utf8',
54+
valueEncoding: 'view',
55+
}) as unknown as BinStrLevel;
56+
const locks = new Locks();
57+
const local = new LevelLocalRepo({
58+
kv,
59+
locks,
60+
sid,
61+
rpc: remote.remote,
62+
connected$: new BehaviorSubject(true),
63+
pubsub,
64+
onSyncError: (error) => console.error(error),
65+
...opts.local,
66+
});
67+
const stop = async () => {
68+
await local.stop();
69+
pubsub.end();
70+
};
71+
return {kv, sid, local, pubsub, stop};
72+
};
4873
const log = Log.fromNewModel(Model.create(undefined, sid));
4974
const getModelFromRemote = async (id: string = blockId.join('/')): Promise<Model> => {
5075
const res = await remote.client.call('block.get', {id});
@@ -60,6 +85,7 @@ export const setup = async (
6085
remote,
6186
locks,
6287
createLocal,
88+
createRemote,
6389
pubsub,
6490
local,
6591
sid,

src/json-crdt-repo/session/EditSession.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export class EditSession {
3838
}
3939

4040
public dispose(): void {
41+
if (this._stopped) return;
4142
this._stopped = true;
4243
this._stop$.next();
4344
}
@@ -114,6 +115,12 @@ export class EditSession {
114115
});
115116
}
116117

118+
public async del(): Promise<void> {
119+
this.clear();
120+
this.dispose();
121+
await this.repo.del(this.id);
122+
}
123+
117124
/**
118125
* Load latest state from the local repo.
119126
*/
@@ -132,7 +139,6 @@ export class EditSession {
132139
this.start = model.clone();
133140
const log = this.log;
134141
const end = log.end;
135-
// TODO: Remove this condition, make flush always safe to call.
136142
if (end.api.builder.patch.ops.length) end.api.flush();
137143
end.reset(model);
138144
log.patches.forEach((patch) => end.applyPatch(patch.v));
@@ -142,7 +148,6 @@ export class EditSession {
142148
if (patches.length === 0) return;
143149
const log = this.log;
144150
const end = log.end;
145-
// TODO: Remove this condition, make flush always safe to call.
146151
if (end.api.builder.patch.ops.length) end.api.flush();
147152
if (log.patches.size() === 0) {
148153
this.merge(patches);

src/json-crdt-repo/session/EditSessionFactory.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export class EditSessionFactory {
1616
* with a given ID already exists, it asynchronously synchronizes the local
1717
* and remote state.
1818
*/
19-
public make(opts: EditSessionMakeOpts): EditSession {
19+
public make(opts: EditSessionMakeOpts): {session: EditSession, sync?: Promise<void>} {
2020
const {id, schema, pull = true} = opts;
2121
const factoryOpts = this.opts;
2222
const model = Model.create(void 0, factoryOpts.sid);
@@ -26,11 +26,12 @@ export class EditSessionFactory {
2626
sessionModel.setSchema(schema);
2727
sessionModel.api.flush();
2828
}
29+
let sync: Promise<void> | undefined;
2930
if (pull && !session.log.patches.size()) {
30-
session.sync().catch(() => {});
31+
sync = session.sync().then(() => {}).catch(() => {});
3132
}
3233
session.log.end.api.autoFlush();
33-
return session;
34+
return {session, sync};
3435
}
3536

3637
/**
@@ -67,7 +68,7 @@ export class EditSessionFactory {
6768
} else throw error;
6869
}
6970
}
70-
if (opts.make) return this.make({session: opts.session, ...opts.make, id});
71+
if (opts.make) return this.make({session: opts.session, ...opts.make, id}).session;
7172
}
7273
throw error;
7374
}

src/json-crdt-repo/session/__tests__/EditSession.JsonPatch.spec.ts

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ import {JsonPatchStore} from 'json-joy/lib/json-crdt/json-patch';
33
import {setup} from './setup';
44
import {until} from 'thingies/lib/until';
55
import {EditSessionFactory} from '../EditSessionFactory';
6-
import {tick} from 'thingies';
76

87
describe('JSON Patch interface', () => {
98
test('can edit two sessions in the same tab', async () => {
109
const kit = await setup();
1110
const schema = s.obj({foo: s.con('bar')});
12-
const session1 = kit.sessions.make({id: kit.blockId, schema, session: 1});
11+
const {session: session1} = kit.sessions.make({id: kit.blockId, schema, session: 1});
1312
const session2 = await kit.sessions.load({id: kit.blockId, make: {schema, session: 2}});
1413
const jp1 = new JsonPatchStore(session1.model);
1514
const jp2 = new JsonPatchStore(session2.model);
@@ -46,10 +45,10 @@ describe('JSON Patch interface', () => {
4645
await kit.stop();
4746
});
4847

49-
test.only('can edit two sessions in different tabs', async () => {
48+
test('can edit two sessions in different tabs', async () => {
5049
const kit = await setup();
5150
const schema = s.obj({foo: s.con('bar')});
52-
const session1 = kit.sessions.make({id: kit.blockId, schema, session: 1});
51+
const {session: session1} = kit.sessions.make({id: kit.blockId, schema, session: 1});
5352
const local2 = await kit.createLocal();
5453
const sessions2 = new EditSessionFactory({
5554
sid: local2.sid,
@@ -72,19 +71,18 @@ describe('JSON Patch interface', () => {
7271
};
7372
await assertView({foo: 'bar'});
7473
jp2.update({op: 'add', path: '/tags', value: ['tag1', 'tag2']});
75-
await tick(123);
76-
// await assertView({foo: 'bar', tags: ['tag1', 'tag2']});
77-
// jp1.update({op: 'add', path: '/a', value: {b: 'c'}});
78-
// await assertView({foo: 'bar', tags: ['tag1', 'tag2'], a: {b: 'c'}});
79-
// jp1.update([
80-
// {op: 'str_ins', path: '/a/b', pos: 1, str: 'd'},
81-
// {op: 'add', path: '/a/x', value: 'y'},
82-
// ]);
83-
// jp2.update([
84-
// {op: 'remove', path: ['foo']},
85-
// {op: 'add', path: '/a/y', value: 'x'},
86-
// ]);
87-
// await assertView({tags: ['tag1', 'tag2'], a: {b: 'cd', x: 'y', y: 'x'}});
74+
await assertView({foo: 'bar', tags: ['tag1', 'tag2']});
75+
jp1.update({op: 'add', path: '/a', value: {b: 'c'}});
76+
await assertView({foo: 'bar', tags: ['tag1', 'tag2'], a: {b: 'c'}});
77+
jp1.update([
78+
{op: 'str_ins', path: '/a/b', pos: 1, str: 'd'},
79+
{op: 'add', path: '/a/x', value: 'y'},
80+
]);
81+
jp2.update([
82+
{op: 'remove', path: ['foo']},
83+
{op: 'add', path: '/a/y', value: 'x'},
84+
]);
85+
await assertView({tags: ['tag1', 'tag2'], a: {b: 'cd', x: 'y', y: 'x'}});
8886
await session1.dispose();
8987
await session2.dispose();
9088
await local2.stop();

0 commit comments

Comments
 (0)