Skip to content

Commit e428df3

Browse files
committed
Add RSocket client
1 parent a472d50 commit e428df3

File tree

12 files changed

+194
-74
lines changed

12 files changed

+194
-74
lines changed

connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import io.github.jan.supabase.postgrest.from
2020
import io.github.jan.supabase.storage.BucketApi
2121
import io.github.jan.supabase.storage.Storage
2222
import io.github.jan.supabase.storage.storage
23+
import io.ktor.client.call.body
2324
import io.ktor.client.plugins.HttpSend
2425
import io.ktor.client.plugins.plugin
26+
import io.ktor.client.request.get
2527
import io.ktor.client.statement.bodyAsText
2628
import io.ktor.utils.io.InternalAPI
2729
import kotlinx.coroutines.flow.StateFlow
30+
import kotlinx.serialization.Serializable
2831
import kotlinx.serialization.json.Json
2932

3033
/**
@@ -162,12 +165,17 @@ public class SupabaseConnector(
162165
supabaseClient.auth.currentSessionOrNull()
163166
?: error("Could not fetch Supabase credentials")
164167

165-
check(session.user != null) { "No user data" }
168+
@Serializable
169+
class TokenResponse(
170+
val token: String
171+
)
172+
173+
val src = supabaseClient.httpClient.httpClient.get("http://localhost:6060/api/auth/token").body<TokenResponse>()
166174

167175
// userId is for debugging purposes only
168176
PowerSyncCredentials(
169-
endpoint = powerSyncEndpoint,
170-
token = session.accessToken, // Use the access token to authenticate against PowerSync
177+
endpoint = "http://localhost:8080", //powerSyncEndpoint,
178+
token = src.token, //session.accessToken, // Use the access token to authenticate against PowerSync
171179
userId = session.user!!.id,
172180
)
173181
}

core/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ kotlin {
174174
implementation(libs.ktor.client.contentnegotiation)
175175
implementation(libs.ktor.serialization.json)
176176
implementation(libs.kotlinx.io)
177-
implementation(libs.rsocket.client)
177+
api(libs.rsocket.core)
178+
implementation(libs.rsocket.transport.websocket)
178179
implementation(libs.kotlinx.coroutines.core)
179180
implementation(libs.kotlinx.datetime)
180181
implementation(libs.stately.concurrency)

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ import com.powersync.sync.Instruction
1212
import com.powersync.sync.SyncDataBatch
1313
import com.powersync.sync.SyncLocalDatabaseResult
1414
import com.powersync.utils.JsonUtil
15+
import io.ktor.utils.io.asByteWriteChannel
16+
import io.ktor.utils.io.writeByteArray
17+
import kotlinx.io.Buffer
18+
import kotlinx.io.buffered
19+
import kotlinx.io.files.FileSystem
20+
import kotlinx.io.files.Path
21+
import kotlinx.io.files.SystemFileSystem
1522
import kotlinx.serialization.Serializable
1623
import kotlinx.serialization.encodeToString
1724

@@ -178,7 +185,17 @@ internal class BucketStorageImpl(
178185
return db.writeTransaction { tx ->
179186
logger.v { "powersync_control($op, binary payload)" }
180187

181-
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult)
188+
try {
189+
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult)
190+
} catch (e: Exception) {
191+
println("Got control exception, writing")
192+
SystemFileSystem.sink(Path("/Users/simon/failing_line.bin")).buffered().apply {
193+
write(payload)
194+
flush()
195+
close()
196+
}
197+
throw e
198+
}
182199
}
183200
}
184201
}

core/src/commonMain/kotlin/com/powersync/sync/Progress.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public data class SyncDownloadProgress internal constructor(
9090
.asSequence()
9191
.filter { it.priority >= priority }
9292
.fold(0L to 0L) { (prevTarget, prevCompleted), entry ->
93-
(prevTarget + entry.targetCount) to (prevCompleted + entry.sinceLast)
93+
(prevTarget + entry.targetCount - entry.atLast) to (prevCompleted + entry.sinceLast)
9494
}
9595
.let { it.first.toInt() to it.second.toInt() }
9696
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package com.powersync.sync
2+
3+
import com.powersync.connectors.PowerSyncCredentials
4+
import com.powersync.utils.JsonUtil
5+
import io.ktor.client.HttpClient
6+
import io.ktor.client.plugins.websocket.webSocketSession
7+
import io.ktor.http.URLBuilder
8+
import io.ktor.http.URLProtocol
9+
import io.ktor.http.takeFrom
10+
import io.rsocket.kotlin.core.RSocketConnector
11+
import io.rsocket.kotlin.payload.PayloadMimeType
12+
import io.rsocket.kotlin.payload.buildPayload
13+
import io.rsocket.kotlin.payload.data
14+
import io.rsocket.kotlin.payload.metadata
15+
import io.rsocket.kotlin.transport.RSocketClientTarget
16+
import io.rsocket.kotlin.transport.RSocketConnection
17+
import io.rsocket.kotlin.transport.RSocketTransportApi
18+
import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection
19+
import kotlinx.coroutines.Dispatchers
20+
import kotlinx.coroutines.IO
21+
import kotlinx.coroutines.currentCoroutineContext
22+
import kotlinx.coroutines.flow.Flow
23+
import kotlinx.coroutines.flow.emitAll
24+
import kotlinx.coroutines.flow.flow
25+
import kotlinx.coroutines.flow.flowOn
26+
import kotlinx.coroutines.flow.map
27+
import kotlinx.io.readByteArray
28+
import kotlinx.serialization.SerialName
29+
import kotlinx.serialization.Serializable
30+
import kotlinx.serialization.json.JsonObject
31+
import kotlin.coroutines.CoroutineContext
32+
33+
@OptIn(RSocketTransportApi::class)
34+
internal fun HttpClient.rSocketSyncStream(
35+
options: ConnectionMethod.WebSocket,
36+
req: JsonObject,
37+
credentials: PowerSyncCredentials,
38+
): Flow<ByteArray> = flow {
39+
val flowContext = currentCoroutineContext()
40+
41+
val websocketUri = URLBuilder(credentials.endpointUri("sync/stream")).apply {
42+
protocol = when (protocolOrNull) {
43+
URLProtocol.HTTP -> URLProtocol.WS
44+
else -> URLProtocol.WSS
45+
}
46+
}
47+
48+
// Note: We're using a custom connector here because we need to set options for each request
49+
// without creating a new HTTP client each time. The recommended approach would be to add an
50+
// RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for
51+
// all connections (bad because we need a short-lived token in there).
52+
// https://github.com/rsocket/rsocket-kotlin/issues/311
53+
val target = object : RSocketClientTarget {
54+
@RSocketTransportApi
55+
override suspend fun connectClient(): RSocketConnection {
56+
val ws = webSocketSession {
57+
url.takeFrom(websocketUri)
58+
}
59+
return KtorWebSocketConnection(ws)
60+
}
61+
62+
override val coroutineContext: CoroutineContext
63+
get() = flowContext
64+
}
65+
66+
val connector = RSocketConnector {
67+
connectionConfig {
68+
payloadMimeType = PayloadMimeType(
69+
metadata = "application/json",
70+
data = "application/json"
71+
)
72+
73+
setupPayload {
74+
buildPayload {
75+
data("{}")
76+
metadata(JsonUtil.json.encodeToString(ConnectionSetupMetadata(token="Bearer ${credentials.token}")))
77+
}
78+
}
79+
80+
keepAlive = options.keepAlive
81+
}
82+
}
83+
84+
val rSocket = connector.connect(target)
85+
val syncStream = rSocket.requestStream(buildPayload {
86+
data("{}")
87+
metadata(JsonUtil.json.encodeToString(RequestStreamMetadata("/sync/stream")))
88+
})
89+
90+
emitAll(syncStream.map { it.data.readByteArray() }.flowOn(Dispatchers.IO))
91+
}
92+
93+
/**
94+
* The metadata payload we need to use when connecting with RSocket.
95+
*
96+
* This corresponds to `RSocketContextMeta` on the sync service.
97+
*/
98+
@Serializable
99+
private class ConnectionSetupMetadata(
100+
val token: String,
101+
@SerialName("user_agent")
102+
val userAgent: String = userAgent()
103+
)
104+
105+
/**
106+
* The metadata payload we send for the `REQUEST_STREAM` frame.
107+
*/
108+
@Serializable
109+
private class RequestStreamMetadata(
110+
val path: String
111+
)

core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,27 @@ import io.rsocket.kotlin.keepalive.KeepAlive
44
import kotlin.time.Duration.Companion.seconds
55

66
public class SyncOptions(
7-
public val method: ConnectionMethod = ConnectionMethod.WebSocket,
7+
public val method: ConnectionMethod = ConnectionMethod.Http,
88
)
99

1010
/**
1111
* The connection method to use when the SDK connects to the sync service.
1212
*/
1313
public sealed interface ConnectionMethod {
1414
/**
15-
* Receive sync lines via an streamed HTTP response from the sync service.
15+
* Receive sync lines via streamed HTTP response from the sync service.
1616
*
1717
* This mode is less efficient than [WebSocket] because it doesn't support backpressure
1818
* properly and uses JSON instead of the more efficient BSON representation for sync lines.
19+
*
20+
* This is currently the default, but this will be changed once [WebSocket] support is stable.
1921
*/
2022
public data object Http: ConnectionMethod
2123

2224
/**
2325
* Receive binary sync lines via RSocket over a WebSocket connection.
2426
*
25-
* This is the default mode, and recommended for most clients.
27+
* This connection mode is currently experimental and requires a recent sync service to work.
2628
*/
2729
public data class WebSocket(
2830
val keepAlive: KeepAlive = DefaultKeepAlive

0 commit comments

Comments
 (0)