Skip to content

Commit a49fb92

Browse files
committed
fix: 🐛 pass concurrent construction tests
1 parent 0cde405 commit a49fb92

File tree

4 files changed

+111
-6
lines changed

4 files changed

+111
-6
lines changed

src/json-crdt-repo/local/level/LevelLocalRepo.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ export class LevelLocalRepo implements LocalRepo {
715715
} catch (error) {
716716
if (error instanceof Error && error.message === 'EXISTS')
717717
// TODO: make sure reset does not happen, if models are the same.
718+
// TODO: Check for `req.time` in `_syncRead`.
718719
return await this._syncRead(id);
719720
throw error;
720721
}
@@ -732,6 +733,17 @@ export class LevelLocalRepo implements LocalRepo {
732733

733734
private async _syncMerge(req: LocalRepoSyncRequest): Promise<LocalRepoSyncResponse> {
734735
const {id, patches} = req;
736+
let lastKnownTime: number = 0;
737+
const reqTime = req.time
738+
if (typeof reqTime === 'number') {
739+
lastKnownTime = reqTime;
740+
const firstPatch = patches?.[0];
741+
if (firstPatch?.getId()?.sid === SESSION.GLOBAL)
742+
lastKnownTime = firstPatch.getId()!.time + firstPatch.span() - 1;
743+
} else if (patches?.length) {
744+
const firstPatchTime = patches?.[0]?.getId()?.time;
745+
if (typeof firstPatchTime === 'number') lastKnownTime = firstPatchTime - 1;
746+
}
735747
const keyBase = await this.blockKeyBase(id);
736748
if (!patches || !patches.length) throw new Error('EMPTY_BATCH');
737749
const writtenPatches: Uint8Array[] = [];
@@ -740,9 +752,12 @@ export class LevelLocalRepo implements LocalRepo {
740752
// TODO: Return correct response.
741753
// TODO: Check that remote state is in sync, too.
742754
let needsReset = false;
755+
let cursorBehind = false;
743756
const didPush = await this.lockBlock(keyBase, async () => {
744757
const [tip, meta] = await Promise.all([this.readFrontierTip(keyBase), this.readMeta(keyBase)]);
758+
if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) cursorBehind = true;
745759
let nextTick = meta.time + 1;
760+
if (lastKnownTime < meta.time) needsReset = true;
746761
cursor = meta.seq;
747762
if (tip) {
748763
const tipTime = tip.getId()?.time ?? 0;
@@ -785,7 +800,7 @@ export class LevelLocalRepo implements LocalRepo {
785800
});
786801
if (!didPush && !needsReset) {
787802
const merge = await this.readFrontier0(keyBase);
788-
return {cursor, merge};
803+
return {cursor, merge, cursorBehind};
789804
}
790805
const remote = this.markDirtyAndSync(keyBase, id)
791806
.then(() => {})
@@ -795,9 +810,9 @@ export class LevelLocalRepo implements LocalRepo {
795810
});
796811
if (needsReset) {
797812
const {cursor, model} = await this._syncRead0(keyBase);
798-
return {cursor, model, remote};
813+
return {cursor, model, remote, cursorBehind};
799814
}
800-
return {cursor, remote};
815+
return {cursor, remote, cursorBehind};
801816
}
802817

803818
protected async readLocal0(keyBase: string): Promise<[model: Model, cursor: LevelLocalRepoCursor]> {

src/json-crdt-repo/local/types.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ export interface LocalRepoSyncRequest {
8787
*/
8888
id: BlockId;
8989

90+
/**
91+
* Logical clock time of the local operations which the client has caught up
92+
* to.
93+
*/
94+
time?: number;
95+
9096
/**
9197
* The last known cursor returned in the `.sync()` call response. The cursor
9298
* should be omitted in the first `.sync()` call, and then set to the value
@@ -115,6 +121,13 @@ export interface LocalRepoSyncResponse {
115121
*/
116122
cursor: undefined | unknown;
117123

124+
/**
125+
* Set to true if the client is behind the remote. When true, the client
126+
* should call `.getIf()` after a short wait period to check if the remote
127+
* is indeed ahead.
128+
*/
129+
cursorBehind?: boolean;
130+
118131
/**
119132
* Model snapshot that the client should reset its "start" state to. The
120133
* `Model` is sent when the *sync* call detects that the client is behind the

src/json-crdt-repo/session/EditSession.ts

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,14 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
8787
const length = patches.length;
8888
// TODO: After async call check that sync state is still valid. New patches, might have been added.
8989
if (length || this.cursor === undefined) {
90-
const res = await this.repo.sync({id: this.id, patches, cursor: this.cursor, session: this.session});
90+
let time = this.start.clock.time - 1;
91+
const res = await this.repo.sync({
92+
id: this.id,
93+
patches,
94+
time,
95+
cursor: this.cursor,
96+
session: this.session
97+
});
9198
if (this._stopped) return null;
9299
// TODO: After sync call succeeds, remove the patches from the log.
93100
if (length) {
@@ -96,16 +103,33 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
96103
if (lastId) this.log.advanceTo(lastId);
97104
this.start.applyBatch(patches);
98105
}
99-
if (typeof res.cursor !== undefined) this.cursor = res.cursor;
106+
// "cursor" shall not be returned from .sync() call. The cursor shall update
107+
// only when remote model changes are received, during local .sync() write
108+
// only the local model is updated.
109+
if (typeof res.cursor !== undefined) {
110+
this.cursor = res.cursor;
111+
}
100112
if (res.model) {
101113
this._syncRace(() => {
102114
this.reset(<any>res.model!);
103115
});
104-
} else if (res.merge) {
116+
} else if (res.merge && res.merge) {
105117
this._syncRace(() => {
106118
this.merge(<any>res.merge!);
107119
});
108120
}
121+
// if (res.cursorBehind) {
122+
// setTimeout(async () => {
123+
// if (!this._stopped) return;
124+
// const get = await this.repo.getIf({
125+
// id: this.id,
126+
// cursor: this.cursor
127+
// });
128+
// if (!this._stopped) return;
129+
// if (!get) return;
130+
// this.reset(<any>get.model);
131+
// }, 50);
132+
// }
109133
return {remote: res.remote};
110134
} else {
111135
const res = await this.repo.getIf({id: this.id, time: this.model.clock.time - 1, cursor: this.cursor});
@@ -209,6 +233,12 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
209233

210234
private onEvent = (event: LocalRepoEvent): void => {
211235
if (this._stopped) return;
236+
if ((event as LocalRepoMergeEvent).merge) {
237+
const cursor = (event as LocalRepoMergeEvent).cursor;
238+
if (cursor !== undefined) {
239+
this.cursor = cursor;
240+
}
241+
}
212242
if ((event as LocalRepoRebaseEvent).rebase) {
213243
if ((event as LocalRepoRebaseEvent).session === this.session) return;
214244
}

src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,38 @@ describe('sync', () => {
118118
await kit.stop();
119119
});
120120

121+
test('sessions created just after the first one, converges in state', async () => {
122+
const kit = await setup();
123+
const schema = s.obj({a: s.con('a')});
124+
const {session: session1} = kit.sessions.make({id: kit.blockId, schema, session: 1});
125+
const {session: session2} = kit.sessions.make({id: kit.blockId, schema, session: 2});
126+
await session1.sync();
127+
await session2.sync();
128+
expect(session1.log.patches.size()).toBe(0);
129+
session1.model.api.obj([]).set({b: 'b'});
130+
session1.model.api.obj([]).set({c: 'c'});
131+
await tick(5);
132+
session1.model.api.obj([]).set({d: 'd'});
133+
const {session: session3} = kit.sessions.make({id: kit.blockId, pull: true, schema, session: 3});
134+
await tick(5);
135+
session1.model.api.obj([]).set({e: 'e'});
136+
await tick(5);
137+
session1.model.api.obj([]).set({f: 'f'});
138+
await session1.sync();
139+
await until(async () => {
140+
try {
141+
expect(session1.model.view()).toEqual(session3.model.view());
142+
return true;
143+
} catch {
144+
return false;
145+
}
146+
});
147+
await session1.dispose();
148+
await session2.dispose();
149+
await session3.dispose();
150+
await kit.stop();
151+
});
152+
121153
test('sessions converge to the same view', async () => {
122154
const kit = await setup();
123155
const schema = s.obj({a: s.con('a')});
@@ -141,7 +173,21 @@ describe('sync', () => {
141173
await until(async () => {
142174
try {
143175
expect(session1.model.view()).toEqual(session2.model.view());
176+
return true;
177+
} catch {
178+
return false;
179+
}
180+
});
181+
await until(async () => {
182+
try {
144183
expect(session1.model.view()).toEqual(session3.model.view());
184+
return true;
185+
} catch {
186+
return false;
187+
}
188+
});
189+
await until(async () => {
190+
try {
145191
expect(session1.model.view()).toEqual(session4.model.view());
146192
return true;
147193
} catch {
@@ -155,6 +201,7 @@ describe('sync', () => {
155201
await session1.dispose();
156202
await session2.dispose();
157203
await session3.dispose();
204+
await session4.dispose();
158205
await kit.stop();
159206
});
160207
});

0 commit comments

Comments
 (0)