Skip to content

Commit 8984343

Browse files
committed
fix: 🐛 emit all locally written patches
1 parent bde0dd4 commit 8984343

File tree

3 files changed

+137
-15
lines changed

3 files changed

+137
-15
lines changed

src/json-crdt-repo/local/level/LevelLocalRepo.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ export class LevelLocalRepo implements LocalRepo {
699699
const {id, patches} = req;
700700
const keyBase = await this.blockKeyBase(id);
701701
if (!patches || !patches.length) throw new Error('EMPTY_BATCH');
702-
const rebasedPatches: Uint8Array[] = [];
702+
const writtenPatches: Uint8Array[] = [];
703703
let cursor: LevelLocalRepoCursor = -1;
704704
// TODO: Check if `patches` need rebasing, if not, just merge.
705705
// TODO: Return correct response.
@@ -740,7 +740,7 @@ export class LevelLocalRepo implements LocalRepo {
740740
const time = id.time;
741741
const patchKey = this.frontierKey(keyBase, time);
742742
const uint8 = rebased.toBinary();
743-
rebasedPatches.push(uint8);
743+
writtenPatches.push(uint8);
744744
const op: BinStrLevelOperation = {type: 'put', key: patchKey,value: uint8};
745745
ops.push(op);
746746
}
@@ -750,6 +750,9 @@ export class LevelLocalRepo implements LocalRepo {
750750
}
751751
return false;
752752
});
753+
if (writtenPatches.length) {
754+
this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session});
755+
}
753756
if (!didPush && !needsReset) {
754757
const merge = await this.readFrontier0(keyBase);
755758
return {cursor, merge};
@@ -760,8 +763,6 @@ export class LevelLocalRepo implements LocalRepo {
760763
if (this._stopped) return;
761764
this.opts.onSyncError?.(error);
762765
});
763-
if (rebasedPatches.length)
764-
this.pubsub.pub({type: 'rebase', id, patches: rebasedPatches, session: req.session});
765766
if (needsReset) {
766767
const {cursor, model} = await this._syncRead0(keyBase);
767768
return {cursor, model, remote};
Lines changed: 130 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import {NodeBuilder, s} from 'json-joy/lib/json-crdt';
22
import {setup as setup0} from './setup';
33
import {until} from 'thingies/lib/until';
4+
import {EditSession} from '../EditSession';
5+
import {EditSessionFactory} from '../EditSessionFactory';
6+
import {BehaviorSubject} from 'rxjs';
47

5-
const setupTwoSessions = async (schema?: undefined | NodeBuilder) => {
8+
type TwoSessionsSetup = (schema?: undefined | NodeBuilder) => Promise<[EditSession, EditSession, () => Promise<void>]>;
9+
10+
const setupTwoRemoteSessions: TwoSessionsSetup = async (schema?: undefined | NodeBuilder) => {
611
const kit = await setup0();
712
const id = kit.blockId;
8-
const session1 = kit.sessions.make({id, schema});
13+
const session1 = kit.sessions.make({id, schema, session: 1});
914
await until(async () => {
1015
try {
1116
await kit.getModelFromRemote(id.join('/'));
@@ -14,32 +19,80 @@ const setupTwoSessions = async (schema?: undefined | NodeBuilder) => {
1419
return false;
1520
}
1621
});
17-
const session2 = await kit.sessions.load({id, remote: {}});
22+
const session2 = await kit.sessions.load({id, session: 2, remote: {}});
23+
const stop = async () => {
24+
await session1.dispose();
25+
await session2.dispose();
26+
await kit.stop();
27+
};
28+
return [session1, session2, stop];
29+
};
30+
31+
const setupTwoLocalSessions: TwoSessionsSetup = async (schema?: undefined | NodeBuilder) => {
32+
const kit = await setup0({local: {connected$: new BehaviorSubject(false)}});
33+
const id = kit.blockId;
34+
const session1 = kit.sessions.make({id, schema, session: 1});
35+
await until(async () => {
36+
try {
37+
await kit.local.get({id});
38+
return true;
39+
} catch {
40+
return false;
41+
}
42+
});
43+
const local2 = await kit.createLocal();
44+
const sessions2 = new EditSessionFactory({
45+
sid: local2.sid,
46+
repo: local2.local,
47+
});
48+
const session2 = await sessions2.load({id, session: 2});
49+
const stop = async () => {
50+
await session1.dispose();
51+
await session2.dispose();
52+
await local2.stop();
53+
await kit.stop();
54+
};
55+
return [session1, session2, stop];
56+
};
57+
58+
const setupTwoSameTabSessions: TwoSessionsSetup = async (schema?: undefined | NodeBuilder) => {
59+
const kit = await setup0({local: {connected$: new BehaviorSubject(false)}});
60+
const id = kit.blockId;
61+
const session1 = kit.sessions.make({id, schema, session: 1});
62+
await until(async () => {
63+
try {
64+
await kit.local.get({id});
65+
return true;
66+
} catch {
67+
return false;
68+
}
69+
});
70+
const session2 = await kit.sessions.load({id, session: 2});
1871
const stop = async () => {
1972
await session1.dispose();
2073
await session2.dispose();
2174
await kit.stop();
2275
};
23-
return [session1, session2, stop] as const;
76+
return [session1, session2, stop];
2477
};
2578

26-
describe('sync through remote server', () => {
27-
test('can load block which exists remotely (no schema)', async () => {
79+
const runTwoSessionsTests = (setupTwoSessions: TwoSessionsSetup) => {
80+
test('can load block created by another session (no schema)', async () => {
2881
const [session1, session2, stop] = await setupTwoSessions();
2982
expect(session1.model.view()).toBe(undefined);
3083
expect(session2.model.view()).toBe(undefined);
3184
await stop();
3285
});
3386

34-
test('can load block which exists remotely (with schema)', async () => {
87+
test('can load block created by another session (with schema)', async () => {
3588
const schema = s.obj({foo: s.str('bar')});
3689
const [session1, session2, stop] = await setupTwoSessions(schema);
3790
expect(session1.model.view()).toEqual({foo: 'bar'});
3891
expect(session2.model.view()).toEqual({foo: 'bar'});
3992
await stop();
4093
});
4194

42-
test('receives a changes done remotely', async () => {
95+
test('receives changes done in another session', async () => {
4396
const schema = undefined;
4497
const [session1, session2, stop] = await setupTwoSessions(schema);
4598
expect(session1.model.view()).toBe(undefined);
@@ -51,7 +104,7 @@ describe('sync through remote server', () => {
51104
await stop();
52105
});
53106

54-
test('receives a changes done remotely (reverse)', async () => {
107+
test('receives changes done in another session (reverse)', async () => {
55108
const schema = undefined;
56109
const [session1, session2, stop] = await setupTwoSessions(schema);
57110
expect(session1.model.view()).toBe(undefined);
@@ -62,4 +115,72 @@ describe('sync through remote server', () => {
62115
expect(session2.model.view()).toEqual({foo: 'bar'});
63116
await stop();
64117
});
118+
119+
test('two sessions can do edits simultaneously (with schema)', async () => {
120+
const schema = s.obj({});
121+
const [session1, session2, stop] = await setupTwoSessions(schema);
122+
session1.model.api.obj([]).set({foo: 'bar'});
123+
session2.model.api.obj([]).set({baz: 'qux'});
124+
await until(() => session2.model.view()?.foo === 'bar');
125+
await until(() => session1.model.view()?.baz === 'qux');
126+
expect(session1.model.view()).toEqual({foo: 'bar', baz: 'qux'});
127+
expect(session2.model.view()).toEqual({foo: 'bar', baz: 'qux'});
128+
await stop();
129+
});
130+
131+
test('two sessions can do edits simultaneously', async () => {
132+
const schema = undefined;
133+
const [session1, session2, stop] = await setupTwoSessions(schema);
134+
session1.model.api.root({});
135+
await until(() => !!session2.model.view());
136+
await until(() => !!session1.model.view());
137+
session1.model.api.obj([]).set({foo: 'bar'});
138+
session2.model.api.obj([]).set({baz: 'qux'});
139+
await until(() => session2.model.view()?.foo === 'bar');
140+
await until(() => session1.model.view()?.baz === 'qux');
141+
expect(session1.model.view()).toEqual({foo: 'bar', baz: 'qux'});
142+
expect(session2.model.view()).toEqual({foo: 'bar', baz: 'qux'});
143+
await stop();
144+
});
145+
146+
test('can synchronize two session doing multiple edits', async () => {
147+
const schema = undefined;
148+
const [session1, session2, stop] = await setupTwoSessions(schema);
149+
expect(session1.model.view()).toBe(undefined);
150+
expect(session2.model.view()).toBe(undefined);
151+
session1.model.api.root({foo: 'bar'});
152+
await until(() => session2.model.view()?.foo === 'bar');
153+
154+
session1.model.api.obj([]).set({x: 'x'});
155+
session1.model.api.obj([]).set({y: 'y'});
156+
await until(() => session2.model.view()?.y === 'y');
157+
expect(session1.model.view()).toEqual({foo: 'bar', x: 'x', y: 'y'});
158+
expect(session2.model.view()).toEqual({foo: 'bar', x: 'x', y: 'y'});
159+
160+
session1.model.api.obj([]).set({x: '1'});
161+
session2.model.api.obj([]).set({y: '2'});
162+
await until(() => session2.model.view()?.x === '1');
163+
await until(() => session1.model.view()?.y === '2');
164+
expect(session1.model.view()).toEqual({foo: 'bar', x: '1', y: '2'});
165+
expect(session2.model.view()).toEqual({foo: 'bar', x: '1', y: '2'});
166+
167+
session2.model.api.obj([]).set({z: 'z'});
168+
await until(() => session1.model.view()?.z === 'z');
169+
expect(session1.model.view()).toEqual({foo: 'bar', x: '1', y: '2', z: 'z'});
170+
expect(session2.model.view()).toEqual({foo: 'bar', x: '1', y: '2', z: 'z'});
171+
172+
await stop();
173+
});
174+
};
175+
176+
describe('two remote sessions', () => {
177+
runTwoSessionsTests(setupTwoRemoteSessions);
178+
});
179+
180+
describe('two local sessions', () => {
181+
runTwoSessionsTests(setupTwoLocalSessions);
182+
});
183+
184+
describe('two same-tab sessions', () => {
185+
runTwoSessionsTests(setupTwoSameTabSessions);
65186
});

src/json-crdt-repo/session/__tests__/setup.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import {setup as setupLocalLevel} from '../../local/level/__tests__/setup';
22
import {EditSessionFactory} from '../EditSessionFactory';
33

4-
export const setup = async () => {
5-
const kit = await setupLocalLevel();
4+
export const setup = async (opts?: Parameters<typeof setupLocalLevel>[0]) => {
5+
const kit = await setupLocalLevel(opts);
66
const createSessions = async (local = kit) => {
77
const sessions = new EditSessionFactory({
88
sid: local.sid,

0 commit comments

Comments
 (0)