Skip to content

Commit 34659a5

Browse files
committed
Use connection manager for tests
1 parent 7b22257 commit 34659a5

File tree

4 files changed

+115
-51
lines changed

4 files changed

+115
-51
lines changed

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ final class ConnectionManager {
2525
final Map<_RawStreamKey, _ActiveSubscription> _locallyActiveSubscriptions =
2626
{};
2727

28-
final StreamController<SyncStatus> _statusController = StreamController();
28+
final StreamController<SyncStatus> _statusController =
29+
StreamController.broadcast();
2930

3031
/// Fires when an entry is added or removed from [_locallyActiveSubscriptions]
3132
/// while we're connected.

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ final class _ActiveRustStreamingIteration {
622622
'include_defaults': sync.options.includeDefaultStreams,
623623
'active_streams': sync._activeSubscriptions
624624
.map((s) => {'name': s.name, 'params': s.parameters})
625+
.toList(),
625626
}),
626627
);
627628
assert(_completedStream.isCompleted, 'Should have started streaming');

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import 'package:async/async.dart';
55
import 'package:logging/logging.dart';
66
import 'package:powersync_core/powersync_core.dart';
77
import 'package:powersync_core/sqlite3_common.dart';
8-
import 'package:powersync_core/src/sync/streaming_sync.dart';
98
import 'package:powersync_core/src/sync/protocol.dart';
109
import 'package:test/test.dart';
1110

@@ -53,35 +52,32 @@ void _declareTests(String name, SyncOptions options, bool bson) {
5352

5453
late TestPowerSyncFactory factory;
5554
late CommonDatabase raw;
56-
late PowerSyncDatabase database;
55+
late TestDatabase database;
5756
late MockSyncService syncService;
5857
late Logger logger;
5958

60-
late StreamingSync syncClient;
6159
var credentialsCallbackCount = 0;
6260
Future<void> Function(PowerSyncDatabase) uploadData = (db) async {};
6361

64-
void createSyncClient({Schema? schema}) {
62+
Future<void> connect() async {
6563
final (client, server) = inMemoryServer();
6664
server.mount(syncService.router.call);
6765

68-
final thisSyncClient = syncClient = database.connectWithMockService(
69-
client,
70-
TestConnector(() async {
71-
credentialsCallbackCount++;
72-
return PowerSyncCredentials(
73-
endpoint: server.url.toString(),
74-
token: 'token$credentialsCallbackCount',
75-
expiresAt: DateTime.now(),
76-
);
77-
}, uploadData: (db) => uploadData(db)),
66+
database.httpClient = client;
67+
await database.connect(
68+
connector: TestConnector(
69+
() async {
70+
credentialsCallbackCount++;
71+
return PowerSyncCredentials(
72+
endpoint: server.url.toString(),
73+
token: 'token$credentialsCallbackCount',
74+
expiresAt: DateTime.now(),
75+
);
76+
},
77+
uploadData: (db) => uploadData(db),
78+
),
7879
options: options,
79-
customSchema: schema,
8080
);
81-
82-
addTearDown(() async {
83-
await thisSyncClient.abort();
84-
});
8581
}
8682

8783
setUp(() async {
@@ -92,7 +88,6 @@ void _declareTests(String name, SyncOptions options, bool bson) {
9288
factory = await testUtils.testFactory();
9389
(raw, database) = await factory.openInMemoryDatabase();
9490
await database.initialize();
95-
createSyncClient();
9691
});
9792

9893
tearDown(() async {
@@ -109,7 +104,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
109104
}
110105
});
111106
}
112-
syncClient.streamingSync();
107+
await connect();
113108
await syncService.waitForListener;
114109

115110
expect(database.currentStatus.lastSyncedAt, isNull);
@@ -144,7 +139,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
144139
});
145140
await expectLater(
146141
status, emits(isSyncStatus(downloading: false, hasSynced: true)));
147-
await syncClient.abort();
142+
await database.disconnect();
148143

149144
final independentDb = factory.wrapRaw(raw, logger: ignoredLogger);
150145
addTearDown(independentDb.close);
@@ -155,7 +150,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
155150
// A complete sync also means that all partial syncs have completed
156151
expect(
157152
independentDb.currentStatus
158-
.statusForPriority(BucketPriority(3))
153+
.statusForPriority(StreamPriority(3))
159154
.hasSynced,
160155
isTrue);
161156
});
@@ -249,7 +244,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
249244
database.watch('SELECT * FROM lists', throttle: Duration.zero));
250245
await expectLater(query, emits(isEmpty));
251246

252-
createSyncClient(schema: schema);
247+
await database.updateSchema(schema);
253248
await waitForConnection();
254249

255250
syncService
@@ -374,13 +369,13 @@ void _declareTests(String name, SyncOptions options, bool bson) {
374369
status,
375370
emitsThrough(
376371
isSyncStatus(downloading: true, hasSynced: false).having(
377-
(e) => e.statusForPriority(BucketPriority(0)).hasSynced,
372+
(e) => e.statusForPriority(StreamPriority(0)).hasSynced,
378373
'status for $prio',
379374
isTrue,
380375
)),
381376
);
382377

383-
await database.waitForFirstSync(priority: BucketPriority(prio));
378+
await database.waitForFirstSync(priority: StreamPriority(prio));
384379
expect(await database.getAll('SELECT * FROM customers'),
385380
hasLength(prio + 1));
386381
}
@@ -417,9 +412,9 @@ void _declareTests(String name, SyncOptions options, bool bson) {
417412
'priority': 1,
418413
}
419414
});
420-
await database.waitForFirstSync(priority: BucketPriority(1));
415+
await database.waitForFirstSync(priority: StreamPriority(1));
421416
expect(database.currentStatus.hasSynced, isFalse);
422-
await syncClient.abort();
417+
await database.disconnect();
423418

424419
final independentDb = factory.wrapRaw(raw, logger: ignoredLogger);
425420
addTearDown(independentDb.close);
@@ -428,12 +423,12 @@ void _declareTests(String name, SyncOptions options, bool bson) {
428423
// Completing a sync for prio 1 implies a completed sync for prio 0
429424
expect(
430425
independentDb.currentStatus
431-
.statusForPriority(BucketPriority(0))
426+
.statusForPriority(StreamPriority(0))
432427
.hasSynced,
433428
isTrue);
434429
expect(
435430
independentDb.currentStatus
436-
.statusForPriority(BucketPriority(3))
431+
.statusForPriority(StreamPriority(3))
437432
.hasSynced,
438433
isFalse);
439434
});
@@ -681,10 +676,9 @@ void _declareTests(String name, SyncOptions options, bool bson) {
681676
await expectProgress(status, total: progress(5, 10));
682677

683678
// Emulate the app closing - create a new independent sync client.
684-
await syncClient.abort();
679+
await database.disconnect();
685680
syncService.endCurrentListener();
686681

687-
createSyncClient();
688682
status = await waitForConnection();
689683

690684
// Send same checkpoint again
@@ -715,10 +709,9 @@ void _declareTests(String name, SyncOptions options, bool bson) {
715709
await expectProgress(status, total: progress(5, 10));
716710

717711
// Emulate the app closing - create a new independent sync client.
718-
await syncClient.abort();
712+
await database.disconnect();
719713
syncService.endCurrentListener();
720714

721-
createSyncClient();
722715
status = await waitForConnection();
723716

724717
// Send checkpoint with additional data
@@ -749,9 +742,9 @@ void _declareTests(String name, SyncOptions options, bool bson) {
749742

750743
// A sync rule deploy could reset buckets, making the new bucket smaller
751744
// than the existing one.
752-
await syncClient.abort();
745+
await database.disconnect();
753746
syncService.endCurrentListener();
754-
createSyncClient();
747+
755748
status = await waitForConnection();
756749
syncService.addLine({
757750
'checkpoint': Checkpoint(
@@ -770,8 +763,8 @@ void _declareTests(String name, SyncOptions options, bool bson) {
770763
await expectProgress(
771764
status,
772765
priorities: {
773-
BucketPriority(0): prio0,
774-
BucketPriority(2): prio2,
766+
StreamPriority(0): prio0,
767+
StreamPriority(2): prio2,
775768
},
776769
total: prio2,
777770
);
@@ -835,7 +828,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
835828
});
836829

837830
await expectLater(status, emits(isSyncStatus(downloading: true)));
838-
await syncClient.abort();
831+
await database.disconnect();
839832

840833
expect(syncService.controller.hasListener, isFalse);
841834
});
@@ -854,9 +847,6 @@ void _declareTests(String name, SyncOptions options, bool bson) {
854847
syncService.addLine({
855848
'checkpoint_complete': {'last_op_id': '10'}
856849
});
857-
858-
await pumpEventQueue();
859-
expect(syncService.controller.hasListener, isFalse);
860850
syncService.endCurrentListener();
861851

862852
// Should reconnect after delay.
@@ -876,9 +866,6 @@ void _declareTests(String name, SyncOptions options, bool bson) {
876866

877867
await expectLater(status, emits(isSyncStatus(downloading: true)));
878868
syncService.addKeepAlive(0);
879-
880-
await pumpEventQueue();
881-
expect(syncService.controller.hasListener, isFalse);
882869
syncService.endCurrentListener();
883870

884871
// Should reconnect after delay.

packages/powersync_core/test/utils/abstract_test_utils.dart

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import 'dart:async';
12
import 'dart:convert';
23

34
import 'package:http/http.dart';
45
import 'package:logging/logging.dart';
56
import 'package:powersync_core/powersync_core.dart';
7+
import 'package:powersync_core/src/abort_controller.dart';
8+
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
69
import 'package:powersync_core/src/sync/bucket_storage.dart';
710
import 'package:powersync_core/src/sync/internal_connector.dart';
811
import 'package:powersync_core/src/sync/options.dart';
@@ -63,20 +66,20 @@ Logger _makeTestLogger({Level level = Level.ALL, String? name}) {
6366
abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory {
6467
Future<CommonDatabase> openRawInMemoryDatabase();
6568

66-
Future<(CommonDatabase, PowerSyncDatabase)> openInMemoryDatabase() async {
69+
Future<(CommonDatabase, TestDatabase)> openInMemoryDatabase() async {
6770
final raw = await openRawInMemoryDatabase();
6871
return (raw, wrapRaw(raw));
6972
}
7073

71-
PowerSyncDatabase wrapRaw(
74+
TestDatabase wrapRaw(
7275
CommonDatabase raw, {
7376
Logger? logger,
7477
}) {
75-
return PowerSyncDatabase.withDatabase(
76-
schema: schema,
78+
return TestDatabase(
7779
database: SqliteDatabase.singleConnection(
7880
SqliteConnection.synchronousWrapper(raw)),
79-
logger: logger,
81+
logger: logger ?? Logger.detached('PowerSync.test'),
82+
schema: schema,
8083
);
8184
}
8285
}
@@ -147,6 +150,78 @@ class TestConnector extends PowerSyncBackendConnector {
147150
}
148151
}
149152

153+
final class TestDatabase
154+
with SqliteQueries, PowerSyncDatabaseMixin
155+
implements PowerSyncDatabase {
156+
@override
157+
final SqliteDatabase database;
158+
@override
159+
final Logger logger;
160+
@override
161+
Schema schema;
162+
163+
@override
164+
late final Future<void> isInitialized;
165+
166+
Client? httpClient;
167+
168+
TestDatabase({
169+
required this.database,
170+
required this.logger,
171+
required this.schema,
172+
}) {
173+
isInitialized = baseInit();
174+
}
175+
176+
@override
177+
Future<void> connectInternal({
178+
required PowerSyncBackendConnector connector,
179+
required ResolvedSyncOptions options,
180+
required List<SubscribedStream> initiallyActiveStreams,
181+
required Stream<List<SubscribedStream>> activeStreams,
182+
required AbortController abort,
183+
required Zone asyncWorkZone,
184+
}) async {
185+
final impl = StreamingSyncImplementation(
186+
adapter: BucketStorage(this),
187+
schemaJson: jsonEncode(schema),
188+
client: httpClient!,
189+
options: options,
190+
connector: InternalConnector.wrap(connector, this),
191+
logger: logger,
192+
crudUpdateTriggerStream: database
193+
.onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)),
194+
activeSubscriptions: initiallyActiveStreams,
195+
);
196+
impl.statusStream.listen(setStatus);
197+
198+
asyncWorkZone.run(impl.streamingSync);
199+
final subscriptions = activeStreams.listen(impl.updateSubscriptions);
200+
201+
abort.onAbort.then((_) async {
202+
subscriptions.cancel();
203+
await impl.abort();
204+
abort.completeAbort();
205+
}).ignore();
206+
}
207+
208+
@override
209+
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
210+
{String? debugContext, Duration? lockTimeout}) async {
211+
await isInitialized;
212+
return database.readLock(callback,
213+
debugContext: debugContext, lockTimeout: lockTimeout);
214+
}
215+
216+
@override
217+
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
218+
{String? debugContext, Duration? lockTimeout}) async {
219+
await isInitialized;
220+
return database.writeLock(callback,
221+
debugContext: debugContext, lockTimeout: lockTimeout);
222+
}
223+
}
224+
150225
extension MockSync on PowerSyncDatabase {
151226
StreamingSyncImplementation connectWithMockService(
152227
Client client,

0 commit comments

Comments
 (0)