@@ -10,6 +10,7 @@ use alloc::{
1010 collections:: { btree_map:: BTreeMap , btree_set:: BTreeSet } ,
1111 format,
1212 string:: { String , ToString } ,
13+ sync:: Arc ,
1314 vec:: Vec ,
1415} ;
1516use futures_lite:: FutureExt ;
@@ -18,6 +19,7 @@ use crate::{
1819 bson,
1920 error:: SQLiteError ,
2021 kv:: client_id,
22+ state:: DatabaseState ,
2123 sync:: { checkpoint:: OwnedBucketChecksum , interface:: StartSyncStream } ,
2224} ;
2325use sqlite_nostd:: { self as sqlite, ResultCode } ;
@@ -37,14 +39,16 @@ use super::{
3739/// initialized.
3840pub struct SyncClient {
3941 db : * mut sqlite:: sqlite3 ,
42+ db_state : Arc < DatabaseState > ,
4043 /// The current [ClientState] (essentially an optional [StreamingSyncIteration]).
4144 state : ClientState ,
4245}
4346
4447impl SyncClient {
45- pub fn new ( db : * mut sqlite:: sqlite3 ) -> Self {
48+ pub fn new ( db : * mut sqlite:: sqlite3 , state : Arc < DatabaseState > ) -> Self {
4649 Self {
4750 db,
51+ db_state : state,
4852 state : ClientState :: Idle ,
4953 }
5054 }
@@ -57,7 +61,7 @@ impl SyncClient {
5761 SyncControlRequest :: StartSyncStream ( options) => {
5862 self . state . tear_down ( ) ?;
5963
60- let mut handle = SyncIterationHandle :: new ( self . db , options) ?;
64+ let mut handle = SyncIterationHandle :: new ( self . db , options, self . db_state . clone ( ) ) ?;
6165 let instructions = handle. initialize ( ) ?;
6266 self . state = ClientState :: IterationActive ( handle) ;
6367
@@ -125,10 +129,15 @@ struct SyncIterationHandle {
125129impl SyncIterationHandle {
126130 /// Creates a new sync iteration in a pending state by preparing statements for
127131 /// [StorageAdapter] and setting up the initial downloading state for [StorageAdapter] .
128- fn new ( db : * mut sqlite:: sqlite3 , options : StartSyncStream ) -> Result < Self , ResultCode > {
132+ fn new (
133+ db : * mut sqlite:: sqlite3 ,
134+ options : StartSyncStream ,
135+ state : Arc < DatabaseState > ,
136+ ) -> Result < Self , ResultCode > {
129137 let runner = StreamingSyncIteration {
130138 db,
131139 options,
140+ state,
132141 adapter : StorageAdapter :: new ( db) ?,
133142 status : SyncStatusContainer :: new ( ) ,
134143 } ;
@@ -192,6 +201,7 @@ impl<'a> ActiveEvent<'a> {
192201
193202struct StreamingSyncIteration {
194203 db : * mut sqlite:: sqlite3 ,
204+ state : Arc < DatabaseState > ,
195205 adapter : StorageAdapter ,
196206 options : StartSyncStream ,
197207 status : SyncStatusContainer ,
@@ -246,9 +256,12 @@ impl StreamingSyncIteration {
246256 SyncEvent :: BinaryLine { data } => bson:: from_bytes ( data) ?,
247257 SyncEvent :: UploadFinished => {
248258 if let Some ( checkpoint) = validated_but_not_applied. take ( ) {
249- let result =
250- self . adapter
251- . sync_local ( & checkpoint, None , & self . options . schema ) ?;
259+ let result = self . adapter . sync_local (
260+ & self . state ,
261+ & checkpoint,
262+ None ,
263+ & self . options . schema ,
264+ ) ?;
252265
253266 match result {
254267 SyncLocalResult :: ChangesApplied => {
@@ -324,9 +337,9 @@ impl StreamingSyncIteration {
324337 ) ,
325338 ) ) ;
326339 } ;
327- let result = self
328- . adapter
329- . sync_local ( target, None , & self . options . schema ) ?;
340+ let result =
341+ self . adapter
342+ . sync_local ( & self . state , target, None , & self . options . schema ) ?;
330343
331344 match result {
332345 SyncLocalResult :: ChecksumFailure ( checkpoint_result) => {
@@ -369,9 +382,12 @@ impl StreamingSyncIteration {
369382 ) ,
370383 ) ) ;
371384 } ;
372- let result =
373- self . adapter
374- . sync_local ( target, Some ( priority) , & self . options . schema ) ?;
385+ let result = self . adapter . sync_local (
386+ & self . state ,
387+ target,
388+ Some ( priority) ,
389+ & self . options . schema ,
390+ ) ?;
375391
376392 match result {
377393 SyncLocalResult :: ChecksumFailure ( checkpoint_result) => {
0 commit comments