Skip to content

Commit b07a262

Browse files
committed
feat: 🎸 cleanup block.listen method
1 parent d805b21 commit b07a262

File tree

7 files changed

+170
-118
lines changed

7 files changed

+170
-118
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
"@jsonjoy.com/jit-router": "^1.0.1",
7575
"@jsonjoy.com/json-pack": "^1.0.2",
7676
"@jsonjoy.com/util": "^1.0.0",
77-
"json-joy": "^15.4.0",
77+
"json-joy": "^15.4.1",
7878
"memfs": "^4.8.1",
7979
"sonic-forest": "^1.0.0"
8080
},

src/__demos__/json-crdt-server/routes/block/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
BlockNew,
1515
BlockSnapshot,
1616
NewBlockSnapshotResponse,
17+
BlockEvent,
1718
} from './schema';
1819
import type {RouteDeps, Router, RouterBase} from '../types';
1920

@@ -34,13 +35,15 @@ export const block =
3435
system.alias('BlockPatchPartial', BlockPatchPartial);
3536
system.alias('BlockPatchPartialReturn', BlockPatchPartialReturn);
3637

38+
system.alias('BlockEvent', BlockEvent);
39+
3740
// prettier-ignore
3841
return (
3942
( new_(d)
4043
( get(d)
4144
( upd(d)
4245
( del(d)
43-
// ( listen(d)
46+
( listen(d)
4447
( scan(d)
45-
( r )))))));
48+
( r ))))))));
4649
};
Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,37 @@
1-
import {switchMap} from 'rxjs';
1+
import {map, switchMap, tap} from 'rxjs';
2+
import {BlockEventRef, BlockIdRef} from '../schema';
23
import type {RouteDeps, Router, RouterBase} from '../../types';
3-
import type {BlockId, BlockPatch, Block} from '../schema';
44

55
export const listen =
66
({t, services}: RouteDeps) =>
77
<R extends RouterBase>(r: Router<R>) => {
88
const Request = t.Object(
9-
t.prop('id', t.Ref<typeof BlockId>('BlockId')).options({
9+
t.prop('id', BlockIdRef).options({
1010
title: 'Block ID',
1111
description: 'The ID of the block to subscribe to.',
1212
}),
1313
);
1414

15-
const Response = t.Or(
16-
t.Tuple(t.Const('del')),
17-
t.Tuple(
18-
t.Const('upd'),
19-
t.Object(
20-
t.propOpt('model', t.Ref<typeof Block>('Block')).options({
21-
title: 'Block',
22-
description: 'The whole block object, emitted only when the block is created.',
23-
}),
24-
t.propOpt('patches', t.Array(t.Ref<typeof BlockPatch>('BlockPatch'))).options({
25-
title: 'Latest Patches',
26-
description: 'Patches that have been applied to the block.',
27-
}),
28-
),
29-
),
15+
const Response = t.Object(
16+
t.prop('event', BlockEventRef),
3017
);
3118

3219
const Func = t.Function$(Request, Response).options({
3320
title: 'Listen for block changes',
34-
description: 'Subscribe to a block to receive updates when it changes.',
21+
description: `Subscribe to a block to receive updates when it changes.`,
3522
});
3623

3724
return r.prop('block.listen', Func, (req$) => {
38-
return req$.pipe(switchMap(({id}) => services.pubsub.listen$(`__block:${id}`))) as any;
25+
const response = req$.pipe(
26+
switchMap(({id}) => {
27+
return services.blocks.listen(id);
28+
}),
29+
map((event) => {
30+
return {
31+
event,
32+
};
33+
}),
34+
);
35+
return response;
3936
});
4037
};

src/__demos__/json-crdt-server/routes/block/schema.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {t} from '../system';
2+
import type {ResolveType} from 'json-joy/lib/json-type';
23

34
export const BlockId = t.str.options({
45
title: 'Block ID',
@@ -82,3 +83,32 @@ export const Block = BlockNew.extend(t.Object(
8283
t.prop('tip', t.Array(BlockPatchRef)),
8384
));
8485
export const BlockRef = t.Ref<typeof Block>('Block');
86+
87+
// --------------------------------------------------------------------- Events
88+
89+
export const BlockDeleteEvent = t.Tuple(
90+
t.Const(<const>'del').options({title: 'Event Type'}),
91+
).options({title: 'Delete Event'});
92+
93+
export const BlockUpdateEvent = t.Tuple(
94+
t.Const(<const>'upd').options({title: 'Event Type'}),
95+
t.Object(
96+
t.prop('patches', t.Array(BlockPatchRef)).options({
97+
title: 'Latest Patches',
98+
description: 'Patches that have been applied to the block.',
99+
}),
100+
).options({title: 'Event Data'}),
101+
).options({title: 'Update Event'});
102+
103+
export const BlockEvent = t.Or(
104+
BlockUpdateEvent,
105+
BlockDeleteEvent,
106+
).options({
107+
title: 'Block Event',
108+
description: 'A collection of possible events that can happen to a block.',
109+
});
110+
export const BlockEventRef = t.Ref<typeof BlockEvent>('BlockEvent');
111+
112+
export type TBlockDeleteEvent = ResolveType<typeof BlockDeleteEvent>;
113+
export type TBlockUpdateEvent = ResolveType<typeof BlockUpdateEvent>;
114+
export type TBlockEvent = ResolveType<typeof BlockEvent>;

src/__demos__/json-crdt-server/services/blocks/BlocksServices.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import {Model, Patch} from 'json-joy/lib/json-crdt';
44
import {SESSION} from 'json-joy/lib/json-crdt-patch/constants';
55
import type {StoreSnapshot, StorePatch} from './types';
66
import type {Services} from '../Services';
7+
import type {Observable} from 'rxjs';
8+
import type {TBlockEvent, TBlockUpdateEvent, TBlockDeleteEvent} from '../../routes/block/schema';
79

810
const BLOCK_TTL = 1000 * 60 * 30; // 30 minutes
911

1012
const validatePatches = (patches: Pick<StorePatch, 'blob'>[]) => {
1113
for (const patch of patches) {
1214
if (patch.blob.length > 2000) throw RpcError.validation('patch blob too large');
13-
// if (patch.seq > 500_000) throw RpcError.validation('patch seq too large');
1415
}
1516
};
1617

@@ -65,7 +66,10 @@ export class BlocksServices {
6566
}
6667

6768
private __emitUpd(id: string, patches: StorePatch[]) {
68-
const msg = ['upd', {patches}];
69+
const msg: TBlockUpdateEvent = ['upd', {patches: patches.map(patch => ({
70+
blob: patch.blob,
71+
ts: patch.created,
72+
}))}];
6973
this.services.pubsub.publish(`__block:${id}`, msg).catch((error) => {
7074
// tslint:disable-next-line:no-console
7175
console.error('Error publishing block patches', error);
@@ -81,7 +85,7 @@ export class BlocksServices {
8185

8286
public async remove(id: string) {
8387
const deleted = await this.store.remove(id);
84-
const msg = ['del'];
88+
const msg: TBlockDeleteEvent = ['del'];
8589
this.services.pubsub.publish(`__block:${id}`, msg).catch((error) => {
8690
// tslint:disable-next-line:no-console
8791
console.error('Error publishing block deletion', error);
@@ -140,6 +144,10 @@ export class BlocksServices {
140144
};
141145
}
142146

147+
public listen(id: string): Observable<TBlockEvent> {
148+
return this.services.pubsub.listen$(`__block:${id}`) as Observable<TBlockEvent>;
149+
}
150+
143151
public stats() {
144152
return this.store.stats();
145153
}

src/__tests__/json-crdt-server/block.ts

Lines changed: 102 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {RpcErrorCodes} from '../../common/rpc/caller';
44
import {tick, until} from 'thingies';
55
import type {ApiTestSetup} from '../../common/rpc/__tests__/runApiTests';
66
import type {JsonCrdtTestSetup} from '../../__demos__/json-crdt-server/__tests__/setup';
7+
import type {TBlockEvent} from '../../__demos__/json-crdt-server/routes/block/schema';
78

89
const sid = Math.random().toString(36).slice(2);
910
let seq = 0;
@@ -232,96 +233,109 @@ export const runBlockTests = (_setup: ApiTestSetup, params: {staticOnly?: true}
232233
});
233234
});
234235

235-
// if (!params.staticOnly) {
236-
// describe('block.listen', () => {
237-
// test('can listen for block changes', async () => {
238-
// const {call, call$, stop} = await setup();
239-
// const id = getId();
240-
// await call('block.new', {id, patches: []});
241-
// await tick(11);
242-
// const emits: any[] = [];
243-
// call$('block.listen', {id}).subscribe((data) => emits.push(data));
244-
// const model = Model.withLogicalClock();
245-
// model.api.root({
246-
// text: 'Hell',
247-
// });
248-
// const patch1 = model.api.flush();
249-
// await tick(12);
250-
// expect(emits.length).toBe(0);
251-
// await call('block.upd', {
252-
// id,
253-
// patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}],
254-
// });
255-
// await until(() => emits.length === 1);
256-
// expect(emits.length).toBe(1);
257-
// expect(emits[0][0]).toBe('upd');
258-
// expect(emits[0][1].patches.length).toBe(1);
259-
// expect(emits[0][1].patches[0].seq).toBe(0);
260-
// model.api.root({
261-
// text: 'Hello',
262-
// });
263-
// const patch2 = model.api.flush();
264-
// await tick(12);
265-
// expect(emits.length).toBe(1);
266-
// await call('block.upd', {
267-
// id,
268-
// patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}],
269-
// });
270-
// await until(() => emits.length === 2);
271-
// expect(emits.length).toBe(2);
272-
// expect(emits[1][0]).toBe('upd');
273-
// expect(emits[1][1].patches.length).toBe(1);
274-
// expect(emits[1][1].patches[0].seq).toBe(1);
275-
// stop();
276-
// });
236+
if (!params.staticOnly) {
237+
describe('block.listen', () => {
238+
test('can listen for block changes', async () => {
239+
const {call, call$, stop} = await setup();
240+
const id = getId();
241+
await call('block.new', {id, patches: []});
242+
await tick(11);
243+
const emits: TBlockEvent[] = [];
244+
call$('block.listen', {id}).subscribe(({event}) => emits.push(event));
245+
const model = Model.withLogicalClock();
246+
model.api.root({
247+
text: 'Hell',
248+
});
249+
const patch1 = model.api.flush();
250+
await tick(12);
251+
expect(emits.length).toBe(0);
252+
await call('block.upd', {
253+
id,
254+
patches: [{blob: patch1.toBinary()}],
255+
});
256+
await until(() => emits.length === 1);
257+
expect(emits.length).toBe(1);
258+
expect(emits[0][0]).toBe('upd');
259+
if (emits[0][0] === 'upd') {
260+
expect(emits[0][1].patches.length).toBe(1);
261+
expect(emits[0][1].patches[0]).toMatchObject({
262+
ts: expect.any(Number),
263+
blob: patch1.toBinary(),
264+
});
265+
}
266+
model.api.root({
267+
text: 'Hello',
268+
});
269+
const patch2 = model.api.flush();
270+
await tick(12);
271+
expect(emits.length).toBe(1);
272+
await call('block.upd', {
273+
id,
274+
patches: [{blob: patch2.toBinary()}],
275+
});
276+
await until(() => emits.length === 2);
277+
expect(emits.length).toBe(2);
278+
expect(emits[1][0]).toBe('upd');
279+
if (emits[1][0] === 'upd') {
280+
expect(emits[1][1].patches.length).toBe(1);
281+
expect(emits[1][1].patches[0]).toMatchObject({
282+
ts: expect.any(Number),
283+
blob: patch2.toBinary(),
284+
});
285+
}
286+
stop();
287+
});
277288

278-
// test('can subscribe before block is created', async () => {
279-
// const {call, call$, stop} = await setup();
280-
// const emits: any[] = [];
281-
// const id = getId();
282-
// call$('block.listen', {id}).subscribe((data) => emits.push(data));
283-
// const model = Model.withLogicalClock();
284-
// model.api.root({
285-
// text: 'Hell',
286-
// });
287-
// const patch1 = model.api.flush();
288-
// await tick(12);
289-
// expect(emits.length).toBe(0);
290-
// await call('block.new', {
291-
// id,
292-
// patches: [
293-
// {
294-
// blob: patch1.toBinary(),
295-
// },
296-
// ],
297-
// });
298-
// await until(() => emits.length === 1);
299-
// expect(emits.length).toBe(1);
300-
// expect(emits[0][0]).toBe('upd');
301-
// expect(emits[0][1].patches.length).toBe(1);
302-
// expect(emits[0][1].patches[0].seq).toBe(0);
303-
// expect(emits[0][1].patches[0].blob).toStrictEqual(patch1.toBinary());
304-
// stop();
305-
// });
289+
test('can subscribe before block is created', async () => {
290+
const {call, call$, stop} = await setup();
291+
const emits: TBlockEvent[] = [];
292+
const id = getId();
293+
call$('block.listen', {id}).subscribe(({event}) => emits.push(event));
294+
const model = Model.withLogicalClock();
295+
model.api.root({
296+
text: 'Hell',
297+
});
298+
const patch1 = model.api.flush();
299+
await tick(12);
300+
expect(emits.length).toBe(0);
301+
await call('block.new', {
302+
id,
303+
patches: [
304+
{
305+
blob: patch1.toBinary(),
306+
},
307+
],
308+
});
309+
await until(() => emits.length === 1);
310+
expect(emits.length).toBe(1);
311+
expect(emits[0][0]).toBe('upd');
312+
if (emits[0][0] === 'upd') {
313+
expect(emits[0][1].patches.length).toBe(1);
314+
expect(emits[0][1].patches[0]).toMatchObject({
315+
ts: expect.any(Number),
316+
blob: patch1.toBinary(),
317+
});
318+
}
319+
stop();
320+
});
306321

307-
// test('can receive deletion events', async () => {
308-
// const {call, call$, stop} = await setup();
309-
// const emits: any[] = [];
310-
// const id = getId();
311-
// call$('block.listen', {id}).subscribe((data) => {
312-
// emits.push(data);
313-
// });
314-
// await call('block.new', {id, patches: []});
315-
// await until(() => emits.length === 1);
316-
// expect(emits[0][1].model.seq).toBe(-1);
317-
// await tick(3);
318-
// await call('block.del', {id});
319-
// await until(() => emits.length === 2);
320-
// expect(emits[1][0]).toBe('del');
321-
// stop();
322-
// });
323-
// });
324-
// }
322+
test('can receive deletion events', async () => {
323+
const {call, call$, stop} = await setup();
324+
const emits: TBlockEvent[] = [];
325+
const id = getId();
326+
call$('block.listen', {id}).subscribe(({event}) => {
327+
emits.push(event);
328+
});
329+
await call('block.new', {id, patches: []});
330+
await until(() => emits.length === 1);
331+
await tick(3);
332+
await call('block.del', {id});
333+
await until(() => emits.length === 2);
334+
expect(emits[1][0]).toBe('del');
335+
stop();
336+
});
337+
});
338+
}
325339

326340
describe('block.scan', () => {
327341
test('can retrieve change history', async () => {

0 commit comments

Comments
 (0)