@@ -10,10 +10,11 @@ import com.powersync.bucket.OplogEntry
1010import com.powersync.testutils.ActiveDatabaseTest
1111import com.powersync.testutils.databaseTest
1212import com.powersync.testutils.waitFor
13+ import io.kotest.assertions.withClue
14+ import io.kotest.matchers.properties.shouldHaveValue
1315import kotlinx.coroutines.channels.Channel
1416import kotlin.test.BeforeTest
1517import kotlin.test.Test
16- import kotlin.test.assertEquals
1718import kotlin.test.assertFalse
1819import kotlin.test.assertNull
1920import kotlin.test.assertTrue
@@ -80,6 +81,16 @@ abstract class BaseSyncProgressTest(
8081 }
8182 }
8283
84+ private fun ProgressWithOperations.shouldBe (
85+ downloaded : Int ,
86+ total : Int ,
87+ ) {
88+ withClue(" progress $downloadedOperations /$totalOperations should be $downloaded /$total " ) {
89+ this ::downloadedOperations shouldHaveValue downloaded
90+ this ::totalOperations shouldHaveValue total
91+ }
92+ }
93+
8394 private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress (
8495 total : Pair <Int , Int >,
8596 priorities : Map <BucketPriority , Pair <Int , Int >> = emptyMap(),
@@ -88,14 +99,12 @@ abstract class BaseSyncProgressTest(
8899 val progress = item.downloadProgress ? : error(" Expected download progress on $item " )
89100
90101 assertTrue { item.downloading }
91- assertEquals(total.first, progress.downloadedOperations)
92- assertEquals(total.second, progress.totalOperations)
102+ progress.shouldBe(total.first, total.second)
93103
94104 priorities.forEach { (priority, expected) ->
95105 val (expectedDownloaded, expectedTotal) = expected
96- val progress = progress.untilPriority(priority)
97- assertEquals(expectedDownloaded, progress.downloadedOperations)
98- assertEquals(expectedTotal, progress.totalOperations)
106+ val actualProgress = progress.untilPriority(priority)
107+ actualProgress.shouldBe(expectedDownloaded, expectedTotal)
99108 }
100109 }
101110
@@ -278,6 +287,61 @@ abstract class BaseSyncProgressTest(
278287 syncLines.close()
279288 }
280289
290+ @Test
291+ fun interruptedWithDefrag () =
292+ databaseTest {
293+ database.connect(connector)
294+
295+ turbineScope {
296+ val turbine = database.currentStatus.asFlow().testIn(this )
297+ turbine.waitFor { it.connected && ! it.downloading }
298+ syncLines.send(
299+ SyncLine .FullCheckpoint (
300+ Checkpoint (
301+ lastOpId = " 10" ,
302+ checksums = listOf (bucket(" a" , 10 )),
303+ ),
304+ ),
305+ )
306+ turbine.expectProgress(0 to 10 )
307+
308+ addDataLine(" a" , 5 )
309+ turbine.expectProgress(5 to 10 )
310+
311+ turbine.cancel()
312+ }
313+
314+ // Close and re-connect
315+ database.close()
316+ syncLines.close()
317+ database = openDatabase()
318+ syncLines = Channel ()
319+ database.connect(connector)
320+
321+ turbineScope {
322+ val turbine = database.currentStatus.asFlow().testIn(this )
323+ turbine.waitFor { it.connected && ! it.downloading }
324+
325+ // A sync rule deploy could reset buckets, making the new bucket smaller than the
326+ // existing one.
327+ syncLines.send(
328+ SyncLine .FullCheckpoint (
329+ Checkpoint (
330+ lastOpId = " 14" ,
331+ checksums = listOf (bucket(" a" , 4 )),
332+ ),
333+ ),
334+ )
335+
336+ // In this special case, don't report 5/4 as progress
337+ turbine.expectProgress(0 to 4 )
338+ turbine.cancel()
339+ }
340+
341+ database.close()
342+ syncLines.close()
343+ }
344+
281345 @Test
282346 fun differentPriorities () =
283347 databaseTest {
0 commit comments