Skip to content

Commit a472d50

Browse files
committed
Start with rsocket support
1 parent 3afb537 commit a472d50

File tree

7 files changed

+116
-4
lines changed

7 files changed

+116
-4
lines changed

core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ kotlin {
174174
implementation(libs.ktor.client.contentnegotiation)
175175
implementation(libs.ktor.serialization.json)
176176
implementation(libs.kotlinx.io)
177+
implementation(libs.rsocket.client)
177178
implementation(libs.kotlinx.coroutines.core)
178179
implementation(libs.kotlinx.datetime)
179180
implementation(libs.stately.concurrency)

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.powersync.db.Queries
66
import com.powersync.db.crud.CrudBatch
77
import com.powersync.db.crud.CrudTransaction
88
import com.powersync.db.schema.Schema
9+
import com.powersync.sync.SyncOptions
910
import com.powersync.sync.SyncStatus
1011
import com.powersync.utils.JsonParam
1112
import kotlin.coroutines.cancellation.CancellationException
@@ -94,6 +95,7 @@ public interface PowerSyncDatabase : Queries {
9495
crudThrottleMs: Long = 1000L,
9596
retryDelayMs: Long = 5000L,
9697
params: Map<String, JsonParam?> = emptyMap(),
98+
options: SyncOptions = SyncOptions()
9799
)
98100

99101
/**

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import com.powersync.db.internal.PowerSyncVersion
1818
import com.powersync.db.schema.Schema
1919
import com.powersync.db.schema.toSerializable
2020
import com.powersync.sync.PriorityStatusEntry
21+
import com.powersync.sync.SyncOptions
2122
import com.powersync.sync.SyncStatus
2223
import com.powersync.sync.SyncStatusData
2324
import com.powersync.sync.SyncStream
@@ -153,6 +154,7 @@ internal class PowerSyncDatabaseImpl(
153154
crudThrottleMs: Long,
154155
retryDelayMs: Long,
155156
params: Map<String, JsonParam?>,
157+
options: SyncOptions
156158
) {
157159
waitReady()
158160
mutex.withLock {
@@ -168,13 +170,13 @@ internal class PowerSyncDatabaseImpl(
168170
params = params.toJsonObject(),
169171
scope = scope,
170172
createClient = createClient,
173+
options = options,
171174
),
172175
crudThrottleMs,
173176
)
174177
}
175178
}
176179

177-
@OptIn(FlowPreview::class)
178180
internal fun connectInternal(
179181
stream: SyncStream,
180182
crudThrottleMs: Long,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.powersync.sync
2+
3+
import io.rsocket.kotlin.keepalive.KeepAlive
4+
import kotlin.time.Duration.Companion.seconds
5+
6+
public class SyncOptions(
7+
public val method: ConnectionMethod = ConnectionMethod.WebSocket,
8+
)
9+
10+
/**
11+
* The connection method to use when the SDK connects to the sync service.
12+
*/
13+
public sealed interface ConnectionMethod {
14+
/**
15+
* Receive sync lines via an streamed HTTP response from the sync service.
16+
*
17+
* This mode is less efficient than [WebSocket] because it doesn't support backpressure
18+
* properly and uses JSON instead of the more efficient BSON representation for sync lines.
19+
*/
20+
public data object Http: ConnectionMethod
21+
22+
/**
23+
* Receive binary sync lines via RSocket over a WebSocket connection.
24+
*
25+
* This is the default mode, and recommended for most clients.
26+
*/
27+
public data class WebSocket(
28+
val keepAlive: KeepAlive = DefaultKeepAlive
29+
): ConnectionMethod {
30+
private companion object {
31+
val DefaultKeepAlive = KeepAlive(
32+
interval = 20.0.seconds,
33+
maxLifetime = 30.0.seconds,
34+
)
35+
}
36+
}
37+
}

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.powersync.sync
22

3+
import BuildConfig
34
import co.touchlab.kermit.Logger
45
import co.touchlab.kermit.Severity
56
import co.touchlab.stately.concurrency.AtomicBoolean
@@ -18,6 +19,7 @@ import io.ktor.client.call.body
1819
import io.ktor.client.plugins.HttpTimeout
1920
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
2021
import io.ktor.client.plugins.timeout
22+
import io.ktor.client.plugins.websocket.WebSockets
2123
import io.ktor.client.request.get
2224
import io.ktor.client.request.headers
2325
import io.ktor.client.request.preparePost
@@ -26,9 +28,21 @@ import io.ktor.client.statement.bodyAsText
2628
import io.ktor.http.ContentType
2729
import io.ktor.http.HttpHeaders
2830
import io.ktor.http.HttpStatusCode
31+
import io.ktor.http.URLBuilder
32+
import io.ktor.http.URLProtocol
33+
import io.ktor.http.Url
2934
import io.ktor.http.contentType
35+
import io.ktor.http.takeFrom
3036
import io.ktor.utils.io.ByteReadChannel
3137
import io.ktor.utils.io.readUTF8Line
38+
import io.rsocket.kotlin.keepalive.KeepAlive
39+
import io.rsocket.kotlin.ktor.client.RSocketSupport
40+
import io.rsocket.kotlin.ktor.client.rSocket
41+
import io.rsocket.kotlin.payload.Payload
42+
import io.rsocket.kotlin.payload.PayloadMimeType
43+
import io.rsocket.kotlin.payload.buildPayload
44+
import io.rsocket.kotlin.payload.data
45+
import io.rsocket.kotlin.payload.metadata
3246
import kotlinx.coroutines.CancellationException
3347
import kotlinx.coroutines.CompletableDeferred
3448
import kotlinx.coroutines.CoroutineScope
@@ -44,8 +58,11 @@ import kotlinx.coroutines.flow.flow
4458
import kotlinx.coroutines.launch
4559
import kotlinx.coroutines.withContext
4660
import kotlinx.datetime.Clock
61+
import kotlinx.serialization.SerialName
62+
import kotlinx.serialization.Serializable
4763
import kotlinx.serialization.encodeToString
4864
import kotlinx.serialization.json.JsonObject
65+
import kotlin.time.Duration.Companion.seconds
4966

5067
internal class SyncStream(
5168
private val bucketStorage: BucketStorage,
@@ -55,6 +72,7 @@ internal class SyncStream(
5572
private val logger: Logger,
5673
private val params: JsonObject,
5774
private val scope: CoroutineScope,
75+
private val options: SyncOptions,
5876
createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient,
5977
) {
6078
private var isUploadingCrud = AtomicBoolean(false)
@@ -71,6 +89,37 @@ internal class SyncStream(
7189
createClient {
7290
install(HttpTimeout)
7391
install(ContentNegotiation)
92+
93+
(options.method as? ConnectionMethod.WebSocket)?.let {
94+
install(WebSockets)
95+
install(RSocketSupport) {
96+
connector {
97+
connectionConfig {
98+
payloadMimeType = PayloadMimeType(
99+
metadata = "application/json",
100+
data = "application/json"
101+
)
102+
103+
setupPayload {
104+
buildPayload {
105+
@Serializable
106+
class ConnectionSetupMetadata(
107+
// Kind of annoying to specify this here, https://github.com/rsocket/rsocket-kotlin/issues/311
108+
val token: String = "TODO: token",
109+
@SerialName("user_agent")
110+
val userAgent: String = "Kotlin SDK"
111+
)
112+
113+
metadata(JsonUtil.json.encodeToString(ConnectionSetupMetadata()))
114+
}
115+
}
116+
117+
keepAlive = it.keepAlive
118+
}
119+
}
120+
}
121+
}
122+
74123
}
75124

76125
fun invalidateCredentials() {
@@ -184,7 +233,7 @@ internal class SyncStream(
184233
return body.data.writeCheckpoint
185234
}
186235

187-
private fun streamingSyncRequest(req: JsonObject): Flow<String> =
236+
private fun connectViaHttp(req: JsonObject): Flow<String> =
188237
flow {
189238
val credentials = connector.getCredentialsCached()
190239
require(credentials != null) { "Not logged in" }
@@ -225,6 +274,22 @@ internal class SyncStream(
225274
}
226275
}
227276

277+
private fun connectViaWebSocket(req: JsonObject): Flow<ByteArray> = flow {
278+
val credentials = connector.getCredentialsCached()
279+
require(credentials != null) { "Not logged in" }
280+
val uri = URLBuilder(credentials.endpointUri("sync/stream")).apply {
281+
protocol = when (protocolOrNull) {
282+
URLProtocol.HTTP -> URLProtocol.WS
283+
else -> URLProtocol.WSS
284+
}
285+
}
286+
287+
val rSocket = httpClient.rSocket { url.takeFrom(uri) }
288+
rSocket.requestStream(buildPayload {
289+
metadata(JsonUtil.json.encodeToString(""))
290+
})
291+
}
292+
228293
private suspend fun streamingSyncIteration() {
229294
val iteration = ActiveIteration()
230295

@@ -308,7 +373,7 @@ internal class SyncStream(
308373
}
309374

310375
private suspend fun connect(start: Instruction.EstablishSyncStream) {
311-
streamingSyncRequest(start.request).collect { rawLine ->
376+
connectViaHttp(start.request).collect { rawLine ->
312377
control("line_text", rawLine)
313378
}
314379
}

core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class SyncStreamTest {
100100
logger = logger,
101101
params = JsonObject(emptyMap()),
102102
scope = this,
103+
options = SyncOptions(),
103104
)
104105

105106
syncStream.invalidateCredentials()
@@ -137,6 +138,7 @@ class SyncStreamTest {
137138
logger = logger,
138139
params = JsonObject(emptyMap()),
139140
scope = this,
141+
options = SyncOptions(),
140142
)
141143

142144
syncStream.status.update { copy(connected = true) }
@@ -176,6 +178,7 @@ class SyncStreamTest {
176178
logger = logger,
177179
params = JsonObject(emptyMap()),
178180
scope = this,
181+
options = SyncOptions()
179182
)
180183

181184
// Launch streaming sync in a coroutine that we'll cancel after verification

gradle/libs.versions.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ kotlin = "2.1.10"
1414
coroutines = "1.8.1"
1515
kotlinx-datetime = "0.6.2"
1616
kotlinx-io = "0.5.4"
17-
ktor = "3.0.1"
17+
ktor = "3.1.0"
18+
rsocket = "0.20.0"
1819
uuid = "0.8.2"
1920
powersync-core = "0.3.12"
2021
sqlite-jdbc = "3.49.1.0"
@@ -85,6 +86,7 @@ ktor-client-contentnegotiation = { module = "io.ktor:ktor-client-content-negotia
8586
ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" }
8687
ktor-serialization-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" }
8788
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }
89+
rsocket-client = { module = "io.rsocket.kotlin:ktor-client-rsocket", version.ref = "rsocket" }
8890

8991
sqldelight-driver-native = { module = "app.cash.sqldelight:native-driver", version.ref = "sqlDelight" }
9092
sqliter = { module = "co.touchlab:sqliter-driver", version.ref = "sqliter" }

0 commit comments

Comments
 (0)