File tree Expand file tree Collapse file tree 3 files changed +19
-6
lines changed
Expand file tree Collapse file tree 3 files changed +19
-6
lines changed Original file line number Diff line number Diff line change @@ -18,6 +18,7 @@ export interface JsonCrdtRepoOpts {
1818export class JsonCrdtRepo {
1919 public readonly sessions : EditSessionFactory ;
2020 public readonly opts : JsonCrdtRepoOpts ;
21+ public readonly remote : DemoServerRemoteHistory ;
2122
2223 constructor ( opts : Partial < JsonCrdtRepoOpts > ) {
2324 this . opts = {
@@ -26,7 +27,7 @@ export class JsonCrdtRepo {
2627 ...opts ,
2728 } ;
2829 const client = createBinaryWsRpcClient ( this . opts . wsUrl ) as DemoServerClient ;
29- const remote = new DemoServerRemoteHistory ( client ) ;
30+ this . remote = new DemoServerRemoteHistory ( client ) ;
3031 const kv : BinStrLevel = new BrowserLevel ( this . opts . name , {
3132 keyEncoding : 'utf8' ,
3233 valueEncoding : 'view' ,
@@ -39,7 +40,7 @@ export class JsonCrdtRepo {
3940 kv,
4041 locks,
4142 sid,
42- rpc : remote ,
43+ rpc : this . remote ,
4344 pubsub,
4445 connected$,
4546 } ) ;
Original file line number Diff line number Diff line change @@ -34,11 +34,21 @@ export class EditSession {
3434 protected readonly session : number = Math . floor ( Math . random ( ) * 0x7fffffff ) ,
3535 ) {
3636 this . log = new Log ( ( ) => this . start . clone ( ) ) ;
37- const flushUnsubscribe = this . log . end . api . onFlush . listen ( ( a ) => {
37+ const api = this . log . end . api ;
38+ const flushUnsubscribe = api . onFlush . listen ( ( a ) => {
3839 this . syncLog ( ) ;
3940 } ) ;
41+ const patchUnsubscribe = api . onPatch . listen ( ( patch ) => {
42+ const id = patch . getId ( ) ;
43+ if ( ! id ) return ;
44+ const clock = this . model . clock ;
45+ if ( id . sid === clock . sid && clock . time >= id . time ) {
46+ this . syncLog ( ) ;
47+ }
48+ } ) ;
4049 this . _stop$ . pipe ( first ( ) ) . subscribe ( ( ) => {
4150 flushUnsubscribe ( ) ;
51+ patchUnsubscribe ( ) ;
4252 } ) ;
4353 this . repo . change$ ( this . id ) . pipe ( takeUntil ( this . _stop$ ) ) . subscribe ( this . onEvent ) ;
4454 }
@@ -114,8 +124,10 @@ export class EditSession {
114124
115125 public syncLog ( ) : void {
116126 if ( ! this . log . patches . size ( ) ) return ;
117- this . sync ( ) . then ( ( error ) => {
118- this . onsyncerror ?.( error ) ;
127+ this . _syncRace ( ( ) => {
128+ this . sync ( ) . then ( ( error ) => {
129+ this . onsyncerror ?.( error ) ;
130+ } ) ;
119131 } ) ;
120132 }
121133
Original file line number Diff line number Diff line change 11import { s } from 'json-joy/lib/json-crdt' ;
22import { setup } from './setup' ;
3+ import { EditSessionFactory } from '../EditSessionFactory' ;
34import { until } from 'thingies/lib/until' ;
45import { of } from 'thingies' ;
5- import { EditSessionFactory } from '../EditSessionFactory' ;
66import { BehaviorSubject } from 'rxjs' ;
77import { Testbed } from '../../__tests__/testbed' ;
88
You can’t perform that action at this time.
0 commit comments