Skip to content

Commit 15a7674

Browse files
committed
Test token expiry handling
1 parent 6e127f1 commit 15a7674

File tree

3 files changed

+101
-13
lines changed

3 files changed

+101
-13
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@ import com.powersync.testutils.UserRow
1616
import com.powersync.testutils.databaseTest
1717
import com.powersync.testutils.waitFor
1818
import com.powersync.utils.JsonUtil
19+
import dev.mokkery.answering.returns
20+
import dev.mokkery.every
21+
import dev.mokkery.everySuspend
1922
import dev.mokkery.verify
23+
import dev.mokkery.verifyNoMoreCalls
24+
import dev.mokkery.verifySuspend
2025
import io.kotest.matchers.collections.shouldHaveSize
2126
import io.kotest.matchers.shouldBe
2227
import kotlinx.coroutines.CompletableDeferred
2328
import kotlinx.coroutines.DelicateCoroutinesApi
29+
import kotlinx.coroutines.launch
2430
import kotlinx.serialization.encodeToString
2531
import kotlin.test.Test
2632
import kotlin.test.assertEquals
@@ -526,4 +532,62 @@ class SyncIntegrationTest {
526532
// Meaning that the two rows are now visible
527533
database.expectUserCount(2)
528534
}
535+
536+
@Test
537+
fun testTokenExpired() =
538+
databaseTest {
539+
turbineScope(timeout = 10.0.seconds) {
540+
val turbine = database.currentStatus.asFlow().testIn(this)
541+
542+
database.connect(connector, 1000L, retryDelayMs = 5000)
543+
turbine.waitFor { it.connecting }
544+
545+
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
546+
turbine.waitFor { it.connected }
547+
verifySuspend { connector.getCredentialsCached() }
548+
verifyNoMoreCalls(connector)
549+
550+
// Should invalidate credentials when token expires
551+
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 0))
552+
turbine.waitFor { !it.connected }
553+
verify { connector.invalidateCredentials() }
554+
555+
turbine.cancel()
556+
}
557+
}
558+
559+
@Test
560+
fun testTokenPrefetch() =
561+
databaseTest {
562+
val prefetchCalled = CompletableDeferred<Unit>()
563+
val completePrefetch = CompletableDeferred<Unit>()
564+
every { connector.prefetchCredentials() } returns scope.launch {
565+
prefetchCalled.complete(Unit)
566+
completePrefetch.await()
567+
}
568+
569+
turbineScope(timeout = 10.0.seconds) {
570+
val turbine = database.currentStatus.asFlow().testIn(this)
571+
572+
database.connect(connector, 1000L, retryDelayMs = 5000)
573+
turbine.waitFor { it.connecting }
574+
575+
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
576+
turbine.waitFor { it.connected }
577+
verifySuspend { connector.getCredentialsCached() }
578+
verifyNoMoreCalls(connector)
579+
580+
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10))
581+
prefetchCalled.complete(Unit)
582+
// Should still be connected before prefetch completes
583+
database.currentStatus.connected shouldBe true
584+
585+
// After the prefetch completes, we should reconnect
586+
completePrefetch.complete(Unit)
587+
turbine.waitFor { !it.connected }
588+
589+
turbine.waitFor { it.connected }
590+
turbine.cancel()
591+
}
592+
}
529593
}

core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,19 @@ public abstract class PowerSyncBackendConnector {
5656
* This may be called before the current credentials have expired.
5757
*/
5858
@Throws(PowerSyncException::class, CancellationException::class)
59-
public open suspend fun prefetchCredentials(): Job? {
59+
public open fun prefetchCredentials(): Job {
6060
fetchRequest?.takeIf { it.isActive }?.let { return it }
6161

62-
fetchRequest =
62+
val request =
6363
scope.launch {
6464
fetchCredentials().also { value ->
6565
cachedCredentials = value
6666
fetchRequest = null
6767
}
6868
}
6969

70-
return fetchRequest
70+
fetchRequest = request
71+
return request
7172
}
7273

7374
/**

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import kotlinx.coroutines.NonCancellable
4747
import kotlinx.coroutines.cancelAndJoin
4848
import kotlinx.coroutines.channels.BufferOverflow
4949
import kotlinx.coroutines.channels.Channel
50+
import kotlinx.coroutines.coroutineScope
5051
import kotlinx.coroutines.delay
5152
import kotlinx.coroutines.flow.Flow
5253
import kotlinx.coroutines.flow.emitAll
@@ -256,21 +257,25 @@ internal class SyncStream(
256257
}
257258

258259
private suspend fun streamingSyncIteration() {
259-
val iteration = ActiveIteration()
260-
261-
try {
262-
iteration.start()
263-
} finally {
264-
// This can't be cancelled because we need to send a stop message, which is async, to
265-
// clean up resources.
266-
withContext(NonCancellable) {
267-
iteration.stop()
260+
coroutineScope {
261+
val iteration = ActiveIteration(this)
262+
263+
try {
264+
iteration.start()
265+
} finally {
266+
// This can't be cancelled because we need to send a stop message, which is async, to
267+
// clean up resources.
268+
withContext(NonCancellable) {
269+
iteration.stop()
270+
}
268271
}
269272
}
270273
}
271274

272275
private inner class ActiveIteration(
276+
val scope: CoroutineScope,
273277
var fetchLinesJob: Job? = null,
278+
var credentialsInvalidation: Job? = null,
274279
) {
275280
suspend fun start() {
276281
control("start", JsonUtil.json.encodeToString(params))
@@ -336,7 +341,25 @@ internal class SyncStream(
336341
applyCoreChanges(instruction.status)
337342
}
338343
}
339-
is Instruction.FetchCredentials -> TODO()
344+
is Instruction.FetchCredentials -> {
345+
if (instruction.didExpire) {
346+
connector.invalidateCredentials()
347+
} else {
348+
// Token expires soon - refresh it in the background
349+
if (credentialsInvalidation == null) {
350+
val job = scope.launch {
351+
connector.prefetchCredentials().join()
352+
353+
// Token has been refreshed, start another iteration
354+
stop()
355+
}
356+
job.invokeOnCompletion {
357+
credentialsInvalidation = null
358+
}
359+
credentialsInvalidation = job
360+
}
361+
}
362+
}
340363
Instruction.DidCompleteSync -> status.update { copy(downloadError=null) }
341364
is Instruction.UnknownInstruction -> {
342365
throw PowerSyncException("Unknown instruction received from core extension: ${instruction.raw}", null)

0 commit comments

Comments
 (0)