Skip to content

Commit ba4a5c2

Browse files
committed
Start tracking subscriptions
1 parent 29e0f50 commit ba4a5c2

File tree

6 files changed

+44
-9
lines changed

6 files changed

+44
-9
lines changed

crates/core/src/migrations.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,11 @@ CREATE TABLE ps_stream_subscriptions (
394394
local_params TEXT,
395395
ttl INTEGER
396396
) STRICT;
397-
ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id);
397+
ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]';
398398
399-
INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array(
399+
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
400400
json_object('sql', 'todo down migration'),
401-
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 10')
401+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
402402
));
403403
";
404404
local_db.exec_safe(stmt)?;

crates/core/src/sync/interface.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,31 @@ use sqlite_nostd::{self as sqlite, ColumnType};
2020
use sqlite_nostd::{Connection, Context};
2121

2222
/// Payload provided by SDKs when requesting a sync iteration.
23-
#[derive(Default, Deserialize)]
23+
#[derive(Deserialize)]
2424
pub struct StartSyncStream {
2525
/// Bucket parameters to include in the request when opening a sync stream.
2626
#[serde(default)]
2727
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
2828
#[serde(default)]
2929
pub schema: Schema,
30+
#[serde(default = "StartSyncStream::include_defaults_by_default")]
31+
pub include_defaults: bool,
32+
}
33+
34+
impl StartSyncStream {
35+
pub const fn include_defaults_by_default() -> bool {
36+
true
37+
}
38+
}
39+
40+
impl Default for StartSyncStream {
41+
fn default() -> Self {
42+
Self {
43+
parameters: Default::default(),
44+
schema: Default::default(),
45+
include_defaults: Self::include_defaults_by_default(),
46+
}
47+
}
3048
}
3149

3250
/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
@@ -107,6 +125,7 @@ pub struct StreamingSyncRequest {
107125
pub binary_data: bool,
108126
pub client_id: String,
109127
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
128+
pub streams: StreamSubscriptionRequest,
110129
}
111130

112131
#[derive(Serialize)]

crates/core/src/sync/line.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ pub struct BucketChecksum<'a> {
120120
pub priority: Option<BucketPriority>,
121121
#[serde(default)]
122122
pub count: Option<i64>,
123-
#[serde_as(as = "Option<Vec<DisplayFromStr>>")]
123+
#[serde_as(as = "Vec<Option<DisplayFromStr>>")]
124124
#[serde(default)]
125-
pub subscriptions: Option<Vec<i64>>,
125+
pub subscriptions: Vec<Option<i64>>,
126126
// #[serde(default)]
127127
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
128128
// pub last_op_id: Option<i64>,

crates/core/src/sync/storage_adapter.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
state::DatabaseState,
1313
sync::{
1414
checkpoint::{ChecksumMismatch, validate_checkpoint},
15+
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
1516
sync_status::SyncPriorityStatus,
1617
},
1718
sync_local::{PartialSyncOperation, SyncOperation},
@@ -268,6 +269,18 @@ impl StorageAdapter {
268269
}
269270
}
270271

272+
pub fn collect_subscription_requests(
273+
&self,
274+
include_defaults: bool,
275+
) -> Result<StreamSubscriptionRequest, PowerSyncError> {
276+
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
277+
278+
Ok(StreamSubscriptionRequest {
279+
include_defaults,
280+
subscriptions,
281+
})
282+
}
283+
271284
pub fn now(&self) -> Result<Timestamp, ResultCode> {
272285
self.time_stmt.step()?;
273286
let res = Timestamp(self.time_stmt.column_int64(0));

crates/core/src/sync/streaming_sync.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,9 @@ impl StreamingSyncIteration {
592592
binary_data: true,
593593
client_id: client_id(self.db)?,
594594
parameters: self.options.parameters.take(),
595+
streams: self
596+
.adapter
597+
.collect_subscription_requests(self.options.include_defaults)?,
595598
};
596599

597600
event

crates/core/src/sync/subscriptions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::time::Duration;
22

3-
use alloc::string::String;
3+
use alloc::{boxed::Box, string::String};
44
use serde::Deserialize;
55
use serde_with::{serde_as, DurationSeconds};
66

@@ -16,7 +16,7 @@ pub enum SubscriptionChangeRequest {
1616
#[derive(Deserialize)]
1717
pub struct SubscribeToStream {
1818
pub stream: String,
19-
pub params: Option<serde_json::value::RawValue>,
19+
pub params: Option<Box<serde_json::value::RawValue>>,
2020
#[serde_as(as = "Option<DurationSeconds>")]
2121
pub ttl: Option<Duration>,
2222
pub priority: Option<BucketPriority>,
@@ -25,6 +25,6 @@ pub struct SubscribeToStream {
2525
#[derive(Deserialize)]
2626
pub struct UnsubscribeFromStream {
2727
pub stream: String,
28-
pub params: Option<serde_json::value::RawValue>,
28+
pub params: Option<Box<serde_json::value::RawValue>>,
2929
pub immediate: bool,
3030
}

0 commit comments

Comments
 (0)