Skip to content

Commit b3fd778

Browse files
committed
Test handling default streams
1 parent e36383c commit b3fd778

File tree

14 files changed

+219
-71
lines changed

14 files changed

+219
-71
lines changed

crates/core/src/migrations.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError};
1212
use crate::fix_data::apply_v035_fix;
1313
use crate::sync::BucketPriority;
1414

15-
pub const LATEST_VERSION: i32 = 10;
15+
pub const LATEST_VERSION: i32 = 11;
1616

1717
pub fn powersync_migrate(
1818
ctx: *mut sqlite::context,
@@ -387,22 +387,23 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
387387
if current_version < 11 && target_version >= 11 {
388388
let stmt = "\
389389
CREATE TABLE ps_stream_subscriptions (
390-
id NOT NULL INTEGER PRIMARY KEY,
390+
id INTEGER NOT NULL PRIMARY KEY,
391391
stream_name TEXT NOT NULL,
392392
active INTEGER NOT NULL DEFAULT FALSE,
393393
is_default INTEGER NOT NULL DEFAULT FALSE,
394394
local_priority INTEGER,
395395
local_params TEXT,
396396
ttl INTEGER,
397-
expires_at INTEGER
397+
expires_at INTEGER,
398+
last_synced_at INTEGER
398399
) STRICT;
399400
400401
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
401402
json_object('sql', 'todo down migration'),
402403
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
403404
));
404405
";
405-
local_db.exec_safe(stmt)?;
406+
local_db.exec_safe(stmt).into_db_result(local_db)?;
406407
}
407408

408409
Ok(())

crates/core/src/sync/storage_adapter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ impl StorageAdapter {
309309
})?,
310310
ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?,
311311
expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?,
312+
last_synced_at: column_nullable(&stmt, 8, || Ok(stmt.column_int64(8)))?,
312313
})
313314
}
314315

@@ -323,8 +324,6 @@ impl StorageAdapter {
323324
while stmt.step()? == ResultCode::ROW {
324325
action(Self::read_stream_subscription(&stmt)?);
325326
}
326-
327-
stmt.finalize()?;
328327
Ok(())
329328
}
330329

crates/core/src/sync/streaming_sync.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ impl StreamingSyncIteration {
592592
if subscription.is_default {
593593
let found = tracked_subscriptions
594594
.iter()
595-
.filter(|s| s.stream_name == subscription.name)
595+
.filter(|s| s.stream_name == subscription.name && s.local_params.is_none())
596596
.next();
597597

598598
if found.is_none() {
@@ -604,10 +604,20 @@ impl StreamingSyncIteration {
604604

605605
debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id));
606606

607-
let mut resolved: Vec<ActiveStreamSubscription> = tracked_subscriptions
608-
.iter()
609-
.map(|e| ActiveStreamSubscription::from_local(e))
610-
.collect();
607+
let mut resolved: Vec<ActiveStreamSubscription> =
608+
Vec::with_capacity(tracked_subscriptions.len());
609+
// Map of stream name to index in resolved for stream subscriptions without custom
610+
// parameters. This simplifies the association from BucketSubscriptionReason::IsDefault to
611+
// stream subscriptions later.
612+
let mut default_stream_subscriptions = BTreeMap::<&str, usize>::new();
613+
614+
for (i, subscription) in tracked_subscriptions.iter().enumerate() {
615+
resolved.push(ActiveStreamSubscription::from_local(subscription));
616+
617+
if subscription.local_params.is_none() {
618+
default_stream_subscriptions.insert(&subscription.stream_name, i);
619+
}
620+
}
611621

612622
// TODO: Cleanup old default subscriptions?
613623

@@ -625,7 +635,13 @@ impl StreamingSyncIteration {
625635
}
626636
}
627637
}
628-
BucketSubscriptionReason::IsDefault { stream_name } => todo!(),
638+
BucketSubscriptionReason::IsDefault { stream_name } => {
639+
if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) {
640+
resolved[*index]
641+
.associated_buckets
642+
.push(bucket.bucket.clone());
643+
}
644+
}
629645
BucketSubscriptionReason::Unknown => {}
630646
}
631647
}

crates/core/src/sync/subscriptions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct LocallyTrackedSubscription {
2323
pub local_params: Option<Box<JsonString>>,
2424
pub ttl: Option<i64>,
2525
pub expires_at: Option<i64>,
26+
pub last_synced_at: Option<i64>,
2627
}
2728

2829
impl LocallyTrackedSubscription {

crates/core/src/sync/sync_status.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,6 @@ pub struct ActiveStreamSubscription {
271271
pub active: bool,
272272
pub is_default: bool,
273273
pub expires_at: Option<Timestamp>,
274-
pub has_synced: bool,
275274
pub last_synced_at: Option<Timestamp>,
276275
}
277276

@@ -284,8 +283,7 @@ impl ActiveStreamSubscription {
284283
associated_buckets: Vec::new(),
285284
active: local.active,
286285
expires_at: local.expires_at.clone().map(|e| Timestamp(e)),
287-
has_synced: false, // TODDO
288-
last_synced_at: None, // TODO
286+
last_synced_at: local.last_synced_at.map(|e| Timestamp(e)),
289287
}
290288
}
291289
}

crates/core/src/util.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String {
6666
return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\""));
6767
}
6868

69+
/// Calls [read] to read a column if it's not null, otherwise returns [None].
70+
#[inline]
6971
pub fn column_nullable<T, R: FnOnce() -> Result<T, PowerSyncError>>(
7072
stmt: &ManagedStmt,
7173
index: i32,

dart/test/error_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import 'dart:convert';
33
import 'package:sqlite3/common.dart';
44
import 'package:test/test.dart';
55

6-
import 'utils/matchers.dart';
76
import 'utils/native_test_utils.dart';
7+
import 'utils/test_utils.dart';
88

99
void main() {
1010
group('error reporting', () {

dart/test/goldens/simple_iteration.json

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"connected": false,
1010
"connecting": true,
1111
"priority_status": [],
12-
"downloading": null
12+
"downloading": null,
13+
"streams": []
1314
}
1415
}
1516
},
@@ -21,7 +22,11 @@
2122
"raw_data": true,
2223
"binary_data": true,
2324
"client_id": "test-test-test-test",
24-
"parameters": null
25+
"parameters": null,
26+
"streams": {
27+
"include_defaults": true,
28+
"subscriptions": []
29+
}
2530
}
2631
}
2732
}
@@ -59,7 +64,8 @@
5964
"target_count": 1
6065
}
6166
}
62-
}
67+
},
68+
"streams": []
6369
}
6470
}
6571
}
@@ -108,7 +114,8 @@
108114
"target_count": 1
109115
}
110116
}
111-
}
117+
},
118+
"streams": []
112119
}
113120
}
114121
}
@@ -146,7 +153,8 @@
146153
"has_synced": true
147154
}
148155
],
149-
"downloading": null
156+
"downloading": null,
157+
"streams": []
150158
}
151159
}
152160
}

dart/test/goldens/starting_stream.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
"connected": false,
1414
"connecting": true,
1515
"priority_status": [],
16-
"downloading": null
16+
"downloading": null,
17+
"streams": []
1718
}
1819
}
1920
},
@@ -27,6 +28,10 @@
2728
"client_id": "test-test-test-test",
2829
"parameters": {
2930
"foo": "bar"
31+
},
32+
"streams": {
33+
"include_defaults": true,
34+
"subscriptions": []
3035
}
3136
}
3237
}

dart/test/sync_stream_test.dart

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import 'dart:convert';
2+
3+
import 'package:file/local.dart';
4+
import 'package:sqlite3/common.dart';
5+
import 'package:sqlite3/sqlite3.dart';
6+
import 'package:sqlite3_test/sqlite3_test.dart';
7+
import 'package:test/test.dart';
8+
9+
import 'utils/native_test_utils.dart';
10+
import 'utils/test_utils.dart';
11+
12+
void main() {
13+
final vfs = TestSqliteFileSystem(
14+
fs: const LocalFileSystem(), name: 'vfs-stream-test');
15+
16+
setUpAll(() {
17+
loadExtension();
18+
sqlite3.registerVirtualFileSystem(vfs, makeDefault: false);
19+
});
20+
tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs));
21+
22+
late CommonDatabase db;
23+
Object? lastStatus;
24+
25+
setUp(() async {
26+
db = openTestDatabase(vfs: vfs)
27+
..select('select powersync_init();')
28+
..select('select powersync_replace_schema(?)', [json.encode(testSchema)])
29+
..execute('update ps_kv set value = ?2 where key = ?1',
30+
['client_id', 'test-test-test-test']);
31+
});
32+
33+
tearDown(() {
34+
db.dispose();
35+
});
36+
37+
List<Object?> control(String operation, Object? data) {
38+
db.execute('begin');
39+
ResultSet result;
40+
41+
try {
42+
result = db.select('SELECT powersync_control(?, ?)', [operation, data]);
43+
} catch (e) {
44+
db.execute('rollback');
45+
rethrow;
46+
}
47+
48+
db.execute('commit');
49+
final [row] = result;
50+
final instructions = jsonDecode(row.columnAt(0)) as List;
51+
for (final instruction in instructions) {
52+
if (instruction case {'UpdateSyncStatus': final status}) {
53+
lastStatus = status['status']!;
54+
}
55+
}
56+
57+
return instructions;
58+
}
59+
60+
group('default streams', () {
61+
test('are created on-demand', () {
62+
control('start', null);
63+
control(
64+
'line_text',
65+
json.encode(
66+
checkpoint(
67+
lastOpId: 1,
68+
buckets: [
69+
bucketDescription('a',
70+
subscriptions: 'my_default_stream', priority: 1),
71+
],
72+
streams: [('my_default_stream', true)],
73+
),
74+
),
75+
);
76+
77+
expect(
78+
lastStatus,
79+
containsPair(
80+
'streams',
81+
[
82+
{
83+
'name': 'my_default_stream',
84+
'parameters': null,
85+
'associated_buckets': ['a'],
86+
'active': true,
87+
'is_default': true,
88+
'expires_at': null,
89+
'last_synced_at': null
90+
}
91+
],
92+
),
93+
);
94+
});
95+
});
96+
}

0 commit comments

Comments
 (0)