@@ -2,21 +2,30 @@ use alloc::collections::BTreeSet;
22use alloc:: format;
33use alloc:: string:: String ;
44
5+ use crate :: bucket_priority:: BucketPriority ;
56use crate :: error:: { PSResult , SQLiteError } ;
6- use sqlite_nostd as sqlite;
7+ use sqlite_nostd:: { self as sqlite, Value } ;
78use sqlite_nostd:: { ColumnType , Connection , ResultCode } ;
89
910use crate :: ext:: SafeManagedStmt ;
1011use crate :: util:: { internal_table_name, quote_internal_name} ;
1112
12- pub fn can_update_local ( db : * mut sqlite:: sqlite3 ) -> Result < bool , SQLiteError > {
13+ fn can_apply_sync_changes (
14+ db : * mut sqlite:: sqlite3 ,
15+ priority : BucketPriority ,
16+ ) -> Result < bool , SQLiteError > {
17+ // We can only make sync changes visible if data is consistent, meaning that we've seen the
18+ // target operation sent in the original checkpoint message. We allow weakening consistency when
19+ // buckets from different priorities are involved (buckets with higher priorities or a lower
20+ // priority number can be published before we've reached the checkpoint for other buckets).
1321 // language=SQLite
1422 let statement = db. prepare_v2 (
1523 "\
1624 SELECT group_concat(name)
1725FROM ps_buckets
18- WHERE target_op > last_op" ,
26+ WHERE ( target_op > last_op) AND (priority <= ?) " ,
1927 ) ?;
28+ statement. bind_int ( 1 , priority. into ( ) ) ?;
2029
2130 if statement. step ( ) ? != ResultCode :: ROW {
2231 return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
@@ -26,22 +35,34 @@ WHERE target_op > last_op",
2635 return Ok ( false ) ;
2736 }
2837
29- // This is specifically relevant for when data is added to crud before another batch is completed.
30-
31- // language=SQLite
32- let statement = db. prepare_v2 ( "SELECT 1 FROM ps_crud LIMIT 1" ) ?;
33- if statement. step ( ) ? != ResultCode :: DONE {
34- return Ok ( false ) ;
38+ // Don't publish downloaded data until the upload queue is empty (except for downloaded data in
39+ // priority 0, which is published earlier).
40+ if !priority. may_publish_with_outstanding_uploads ( ) {
41+ let statement = db. prepare_v2 ( "SELECT 1 FROM ps_crud LIMIT 1" ) ?;
42+ if statement. step ( ) ? != ResultCode :: DONE {
43+ return Ok ( false ) ;
44+ }
3545 }
3646
3747 Ok ( true )
3848}
3949
40- pub fn sync_local ( db : * mut sqlite:: sqlite3 , _data : & str ) -> Result < i64 , SQLiteError > {
41- if !can_update_local ( db) ? {
50+ pub fn sync_local < V : Value > ( db : * mut sqlite:: sqlite3 , data : & V ) -> Result < i64 , SQLiteError > {
51+ let priority = match data. value_type ( ) {
52+ ColumnType :: Integer => BucketPriority :: try_from ( data. int ( ) ) ,
53+ ColumnType :: Float => BucketPriority :: try_from ( data. double ( ) as i32 ) ,
54+ // Older clients without bucket priority support typically send an empty string here.
55+ _ => Ok ( BucketPriority :: LOWEST ) ,
56+ } ?;
57+
58+ if !can_apply_sync_changes ( db, priority) ? {
4259 return Ok ( 0 ) ;
4360 }
4461
62+ if priority >= BucketPriority :: LOWEST {
63+ todo ! ( "Only consider changes from certain bucket priorities" )
64+ }
65+
4566 // language=SQLite
4667 let statement = db
4768 . prepare_v2 ( "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'" )
0 commit comments