Skip to content

Commit ae9ea85

Browse files
committed
Track subscriptions
1 parent 5967d89 commit ae9ea85

File tree

9 files changed

+417
-26
lines changed

9 files changed

+417
-26
lines changed

crates/core/src/migrations.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,13 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
389389
CREATE TABLE ps_stream_subscriptions (
390390
id NOT NULL INTEGER PRIMARY KEY,
391391
stream_name TEXT NOT NULL,
392-
is_default INTEGER NOT NULL,
392+
active INTEGER NOT NULL DEFAULT FALSE,
393+
is_default INTEGER NOT NULL DEFAULT FALSE,
393394
local_priority INTEGER,
394395
local_params TEXT,
395-
ttl INTEGER
396+
ttl INTEGER,
397+
expires_at INTEGER
396398
) STRICT;
397-
ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]';
398399
399400
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
400401
json_object('sql', 'todo down migration'),

crates/core/src/sync/checkpoint.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use alloc::{string::String, vec::Vec};
22
use num_traits::Zero;
33

4-
use crate::sync::{BucketPriority, Checksum, line::BucketChecksum};
4+
use crate::sync::line::{BucketChecksum, BucketSubscriptionReason};
5+
use crate::sync::{BucketPriority, Checksum};
56
use sqlite_nostd::{self as sqlite, Connection, ResultCode};
67

78
/// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from
@@ -12,6 +13,7 @@ pub struct OwnedBucketChecksum {
1213
pub checksum: Checksum,
1314
pub priority: BucketPriority,
1415
pub count: Option<i64>,
16+
pub subscriptions: BucketSubscriptionReason,
1517
}
1618

1719
impl OwnedBucketChecksum {
@@ -30,6 +32,7 @@ impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {
3032
checksum: value.checksum,
3133
priority: value.priority.unwrap_or(BucketPriority::FALLBACK),
3234
count: value.count,
35+
subscriptions: value.subscriptions.clone(),
3336
}
3437
}
3538
}

crates/core/src/sync/interface.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use sqlite_nostd::bindings::SQLITE_RESULT_SUBTYPE;
1919
use sqlite_nostd::{self as sqlite, ColumnType};
2020
use sqlite_nostd::{Connection, Context};
2121

22+
use crate::sync::BucketPriority;
23+
2224
/// Payload provided by SDKs when requesting a sync iteration.
2325
#[derive(Deserialize)]
2426
pub struct StartSyncStream {
@@ -141,6 +143,7 @@ pub struct RequestedStreamSubscription {
141143
pub stream: String,
142144
/// Parameters to make available in the stream's definition.
143145
pub parameters: Box<serde_json::value::RawValue>,
146+
pub override_priority: Option<BucketPriority>,
144147
#[serde_as(as = "DisplayFromStr")]
145148
pub client_id: i64,
146149
}

crates/core/src/sync/line.rs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use alloc::borrow::Cow;
2+
use alloc::string::{String, ToString};
23
use alloc::vec::Vec;
34
use serde::Deserialize;
4-
use serde::de::{IgnoredAny, VariantAccess, Visitor};
5+
use serde::de::{Error, IgnoredAny, VariantAccess, Visitor};
56
use serde_with::{DisplayFromStr, serde_as};
67

78
use super::Checksum;
@@ -82,6 +83,14 @@ pub struct Checkpoint<'a> {
8283
pub write_checkpoint: Option<i64>,
8384
#[serde(borrow)]
8485
pub buckets: Vec<BucketChecksum<'a>>,
86+
#[serde(default, borrow)]
87+
pub streams: Vec<StreamDefinition<'a>>,
88+
}
89+
90+
#[derive(Deserialize, Debug)]
91+
pub struct StreamDefinition<'a> {
92+
pub name: SyncLineStr<'a>,
93+
pub is_default: bool,
8594
}
8695

8796
#[serde_as]
@@ -120,14 +129,67 @@ pub struct BucketChecksum<'a> {
120129
pub priority: Option<BucketPriority>,
121130
#[serde(default)]
122131
pub count: Option<i64>,
123-
#[serde_as(as = "Vec<Option<DisplayFromStr>>")]
124132
#[serde(default)]
125-
pub subscriptions: Vec<Option<i64>>,
133+
pub subscriptions: BucketSubscriptionReason,
126134
// #[serde(default)]
127135
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
128136
// pub last_op_id: Option<i64>,
129137
}
130138

139+
/// The reason for why a bucket was included in a checkpoint.
140+
#[derive(Debug, Default, Clone)]
141+
pub enum BucketSubscriptionReason {
142+
/// A bucket was created for all of the subscription ids we've explicitly requested in the sync
143+
/// request.
144+
ExplicitlySubscribed { subscriptions: Vec<i64> },
145+
/// A bucket was created from a default stream.
146+
IsDefault { stream_name: String },
147+
/// We're talking to an older sync service not sending the reason.
148+
#[default]
149+
Unknown,
150+
}
151+
152+
impl<'de> Deserialize<'de> for BucketSubscriptionReason {
153+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
154+
where
155+
D: serde::Deserializer<'de>,
156+
{
157+
struct MyVisitor;
158+
159+
impl<'de> Visitor<'de> for MyVisitor {
160+
type Value = BucketSubscriptionReason;
161+
162+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
163+
write!(formatter, "a subscription reason")
164+
}
165+
166+
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
167+
where
168+
A: serde::de::SeqAccess<'de>,
169+
{
170+
let mut subscriptions = Vec::<i64>::new();
171+
172+
while let Some(item) = seq.next_element::<&'de str>()? {
173+
subscriptions.push(item.parse().map_err(|_| A::Error::custom("not an int"))?);
174+
}
175+
176+
Ok(BucketSubscriptionReason::ExplicitlySubscribed { subscriptions })
177+
}
178+
179+
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
180+
where
181+
E: serde::de::Error,
182+
{
183+
Ok(BucketSubscriptionReason::IsDefault {
184+
stream_name: v.to_string(),
185+
})
186+
}
187+
}
188+
189+
deserializer.deserialize_any(MyVisitor)
190+
}
191+
}
192+
131193
#[derive(Deserialize, Debug)]
132194
pub struct DataLine<'a> {
133195
#[serde(borrow)]
@@ -229,6 +291,7 @@ mod tests {
229291
last_op_id: 10,
230292
write_checkpoint: None,
231293
buckets: _,
294+
streams: _,
232295
})
233296
);
234297

@@ -264,6 +327,7 @@ mod tests {
264327
last_op_id: 1,
265328
write_checkpoint: None,
266329
buckets: _,
330+
streams: _,
267331
})
268332
);
269333
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::{assert_matches::debug_assert_matches, fmt::Display};
22

33
use alloc::{string::ToString, vec::Vec};
44
use serde::Serialize;
5+
use serde_json::value::RawValue;
56
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
67

78
use crate::{
@@ -13,9 +14,12 @@ use crate::{
1314
sync::{
1415
checkpoint::{ChecksumMismatch, validate_checkpoint},
1516
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
17+
streaming_sync::OwnedStreamDefinition,
18+
subscriptions::LocallyTrackedSubscription,
1619
sync_status::SyncPriorityStatus,
1720
},
1821
sync_local::{PartialSyncOperation, SyncOperation},
22+
util::{JsonString, column_nullable},
1923
};
2024

2125
use super::{
@@ -288,6 +292,55 @@ impl StorageAdapter {
288292

289293
Ok(res)
290294
}
295+
296+
fn read_stream_subscription(
297+
stmt: &ManagedStmt,
298+
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
299+
Ok(LocallyTrackedSubscription {
300+
id: stmt.column_int64(0),
301+
stream_name: stmt.column_text(1)?.to_string(),
302+
active: stmt.column_int(2) != 0,
303+
is_default: stmt.column_int(3) != 0,
304+
local_priority: column_nullable(&stmt, 4, || {
305+
BucketPriority::try_from(stmt.column_int(4))
306+
})?,
307+
local_params: column_nullable(&stmt, 5, || {
308+
JsonString::from_string(stmt.column_text(5)?.to_string())
309+
})?,
310+
ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?,
311+
expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?,
312+
})
313+
}
314+
315+
pub fn iterate_local_subscriptions<F: FnMut(LocallyTrackedSubscription) -> ()>(
316+
&self,
317+
mut action: F,
318+
) -> Result<(), PowerSyncError> {
319+
let stmt = self
320+
.db
321+
.prepare_v2("SELECT * FROM ps_stream_subscriptions ORDER BY id ASC")?;
322+
323+
while stmt.step()? == ResultCode::ROW {
324+
action(Self::read_stream_subscription(&stmt)?);
325+
}
326+
327+
stmt.finalize()?;
328+
Ok(())
329+
}
330+
331+
pub fn create_default_subscription(
332+
&self,
333+
stream: &OwnedStreamDefinition,
334+
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
335+
let stmt = self.db.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, active, is_default) VALUES (?, TRUE, TRUE) RETURNING *;")?;
336+
stmt.bind_text(1, &stream.name, sqlite_nostd::Destructor::STATIC)?;
337+
338+
if stmt.step()? == ResultCode::ROW {
339+
Self::read_stream_subscription(&stmt)
340+
} else {
341+
Err(PowerSyncError::unknown_internal())
342+
}
343+
}
291344
}
292345

293346
pub struct BucketInfo {

0 commit comments

Comments
 (0)