@@ -8,6 +8,7 @@ import com.powersync.db.crud.CrudRow
88import com.powersync.db.internal.InternalDatabase
99import com.powersync.db.internal.InternalTable
1010import com.powersync.db.internal.PowerSyncTransaction
11+ import com.powersync.sync.Instruction
1112import com.powersync.sync.SyncDataBatch
1213import com.powersync.sync.SyncLocalDatabaseResult
1314import com.powersync.utils.JsonUtil
@@ -21,14 +22,8 @@ internal class BucketStorageImpl(
2122 private var hasCompletedSync = AtomicBoolean (false )
2223 private var pendingBucketDeletes = AtomicBoolean (false )
2324
24- /* *
25- * Count up, and do a compact on startup.
26- */
27- private var compactCounter = COMPACT_OPERATION_INTERVAL
28-
2925 companion object {
3026 const val MAX_OP_ID = " 9223372036854775807"
31- const val COMPACT_OPERATION_INTERVAL = 1_000
3227 }
3328
3429 override fun getMaxOpId (): String = MAX_OP_ID
@@ -130,50 +125,6 @@ internal class BucketStorageImpl(
130125 }
131126 }
132127
133- override suspend fun saveSyncData (syncDataBatch : SyncDataBatch ) {
134- db.writeTransaction { tx ->
135- val jsonString = JsonUtil .json.encodeToString(syncDataBatch)
136- tx.execute(
137- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
138- listOf (" save" , jsonString),
139- )
140- }
141- this .compactCounter + = syncDataBatch.buckets.sumOf { it.data.size }
142- }
143-
144- override suspend fun getBucketStates (): List <BucketState > =
145- db.getAll(
146- " SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable .BUCKETS } WHERE pending_delete = 0 AND name != '\$ local'" ,
147- mapper = { cursor ->
148- BucketState (
149- bucket = cursor.getString(0 )!! ,
150- opId = cursor.getString(1 )!! ,
151- )
152- },
153- )
154-
155- override suspend fun getBucketOperationProgress (): Map <String , LocalOperationCounters > =
156- buildMap {
157- val rows =
158- db.getAll(" SELECT name, count_at_last, count_since_last FROM ps_buckets" ) { cursor ->
159- cursor.getString(0 )!! to
160- LocalOperationCounters (
161- atLast = cursor.getLong(1 )!! .toInt(),
162- sinceLast = cursor.getLong(2 )!! .toInt(),
163- )
164- }
165-
166- for ((name, counters) in rows) {
167- put(name, counters)
168- }
169- }
170-
171- override suspend fun removeBuckets (bucketsToDelete : List <String >) {
172- bucketsToDelete.forEach { bucketName ->
173- deleteBucket(bucketName)
174- }
175- }
176-
177128 private suspend fun deleteBucket (bucketName : String ) {
178129 db.writeTransaction { tx ->
179130 tx.execute(
@@ -208,202 +159,26 @@ internal class BucketStorageImpl(
208159 }
209160 }
210161
211- override suspend fun syncLocalDatabase (
212- targetCheckpoint : Checkpoint ,
213- partialPriority : BucketPriority ? ,
214- ): SyncLocalDatabaseResult {
215- val result = validateChecksums(targetCheckpoint, partialPriority)
216-
217- if (! result.checkpointValid) {
218- logger.w { " [SyncLocalDatabase] Checksums failed for ${result.checkpointFailures} " }
219- result.checkpointFailures?.forEach { bucketName ->
220- deleteBucket(bucketName)
221- }
222- result.ready = false
223- return result
224- }
225-
226- val bucketNames =
227- targetCheckpoint.checksums
228- .let {
229- if (partialPriority == null ) {
230- it
231- } else {
232- it.filter { cs -> cs.priority >= partialPriority }
233- }
234- }.map { it.bucket }
235-
236- db.writeTransaction { tx ->
237- tx.execute(
238- " UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))" ,
239- listOf (targetCheckpoint.lastOpId, JsonUtil .json.encodeToString(bucketNames)),
240- )
241-
242- if (partialPriority == null && targetCheckpoint.writeCheckpoint != null ) {
243- tx.execute(
244- " UPDATE ps_buckets SET last_op = ? WHERE name = '\$ local'" ,
245- listOf (targetCheckpoint.writeCheckpoint),
246- )
247- }
248- }
249-
250- val valid = updateObjectsFromBuckets(targetCheckpoint, partialPriority)
251-
252- if (! valid) {
253- return SyncLocalDatabaseResult (
254- ready = false ,
255- checkpointValid = true ,
256- )
257- }
258-
259- this .forceCompact()
260-
261- return SyncLocalDatabaseResult (
262- ready = true ,
263- )
264- }
265-
266- private suspend fun validateChecksums (
267- checkpoint : Checkpoint ,
268- priority : BucketPriority ? = null,
269- ): SyncLocalDatabaseResult {
270- val serializedCheckpoint =
271- JsonUtil .json.encodeToString(
272- when (priority) {
273- null -> checkpoint
274- // Only validate buckets with a priority included in this partial sync.
275- else -> checkpoint.copy(checksums = checkpoint.checksums.filter { it.priority >= priority })
276- },
277- )
278-
279- val res =
280- db.getOptional(
281- " SELECT powersync_validate_checkpoint(?) AS result" ,
282- parameters = listOf (serializedCheckpoint),
283- mapper = { cursor ->
284- cursor.getString(0 )!!
285- },
286- )
287- ? : // no result
288- return SyncLocalDatabaseResult (
289- ready = false ,
290- checkpointValid = false ,
291- )
162+ private fun handleControlResult (cursor : SqlCursor ): List <Instruction > {
163+ val result = cursor.getString(0 )!!
164+ logger.v { " control result: $result " }
292165
293- return JsonUtil .json.decodeFromString<SyncLocalDatabaseResult >(res )
166+ return JsonUtil .json.decodeFromString<List < Instruction >>(result )
294167 }
295168
296- /* *
297- * Atomically update the local state.
298- *
299- * This includes creating new tables, dropping old tables, and copying data over from the oplog.
300- */
301- private suspend fun updateObjectsFromBuckets (
302- checkpoint : Checkpoint ,
303- priority : BucketPriority ? = null,
304- ): Boolean {
305- @Serializable
306- data class SyncLocalArgs (
307- val priority : BucketPriority ,
308- val buckets : List <String >,
309- )
310-
311- val args =
312- if (priority != null ) {
313- JsonUtil .json.encodeToString(
314- SyncLocalArgs (
315- priority = priority,
316- buckets = checkpoint.checksums.filter { it.priority >= priority }.map { it.bucket },
317- ),
318- )
319- } else {
320- " "
321- }
322-
169+ override suspend fun control (op : String , payload : String? ): List <Instruction > {
323170 return db.writeTransaction { tx ->
324- tx.execute(
325- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
326- listOf (" sync_local" , args),
327- )
328-
329- val res =
330- tx.get(" select last_insert_rowid()" ) { cursor ->
331- cursor.getLong(0 )!!
332- }
333-
334- val didApply = res == 1L
335- if (didApply && priority == null ) {
336- // Reset progress counters. We only do this for a complete sync, as we want a download progress to
337- // always cover a complete checkpoint instead of resetting for partial completions.
338- tx.execute(
339- """
340- UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
341- WHERE ?1->name IS NOT NULL
342- """ .trimIndent(),
343- listOf (
344- JsonUtil .json.encodeToString(
345- buildMap<String , Int > {
346- for (bucket in checkpoint.checksums) {
347- bucket.count?.let { put(bucket.bucket, it) }
348- }
349- },
350- ),
351- ),
352- )
353- }
354-
355- return @writeTransaction didApply
356- }
357- }
171+ logger.v { " powersync_control($op , $payload )" }
358172
359- private suspend fun forceCompact () {
360- // Reset counter
361- this .compactCounter = COMPACT_OPERATION_INTERVAL
362- this .pendingBucketDeletes.value = true
363-
364- this .autoCompact()
365- }
366-
367- private suspend fun autoCompact () {
368- // 1. Delete buckets
369- deletePendingBuckets()
370-
371- // 2. Clear REMOVE operations, only keeping PUT ones
372- clearRemoveOps()
373- }
374-
375- private suspend fun deletePendingBuckets () {
376- if (! this .pendingBucketDeletes.value) {
377- return
378- }
379-
380- db.writeTransaction { tx ->
381- tx.execute(
382- " INSERT INTO powersync_operations(op, data) VALUES (?, ?)" ,
383- listOf (" delete_pending_buckets" , " " ),
384- )
385-
386- // Executed once after start-up, and again when there are pending deletes.
387- pendingBucketDeletes.value = false
173+ tx.get(" SELECT powersync_control(?, ?) AS r" , listOf (op, payload), ::handleControlResult)
388174 }
389175 }
390176
391- private suspend fun clearRemoveOps () {
392- if (this .compactCounter < COMPACT_OPERATION_INTERVAL ) {
393- return
394- }
177+ override suspend fun control (op : String , payload : ByteArray ): List <Instruction > {
178+ return db.writeTransaction { tx ->
179+ logger.v { " powersync_control($op , binary payload)" }
395180
396- db.writeTransaction { tx ->
397- tx.execute(
398- " INSERT INTO powersync_operations(op, data) VALUES (?, ?)" ,
399- listOf (" clear_remove_ops" , " " ),
400- )
181+ tx.get(" SELECT powersync_control(?, ?) AS r" , listOf (op, payload), ::handleControlResult)
401182 }
402- this .compactCounter = 0
403- }
404-
405- @Suppress(" UNUSED_PARAMETER" )
406- override fun setTargetCheckpoint (checkpoint : Checkpoint ) {
407- // No-op for now
408183 }
409184}
0 commit comments