Skip to content

Commit 231b2d6

Browse files
committed
Native: Propagate stream subscriptions
1 parent c1d0c54 commit 231b2d6

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ class PowerSyncDatabaseImpl
142142

143143
bool triedSpawningIsolate = false;
144144
StreamSubscription<UpdateNotification>? crudUpdateSubscription;
145+
StreamSubscription<void>? activeStreamsSubscription;
145146
final receiveMessages = ReceivePort();
146147
final receiveUnhandledErrors = ReceivePort();
147148
final receiveExit = ReceivePort();
@@ -159,6 +160,7 @@ class PowerSyncDatabaseImpl
159160

160161
// Cleanup
161162
crudUpdateSubscription?.cancel();
163+
activeStreamsSubscription?.cancel();
162164
receiveMessages.close();
163165
receiveUnhandledErrors.close();
164166
receiveExit.close();
@@ -200,6 +202,10 @@ class PowerSyncDatabaseImpl
200202
crudUpdateSubscription = crudStream.listen((event) {
201203
port.send(['update']);
202204
});
205+
206+
activeStreamsSubscription = activeStreams.listen((streams) {
207+
port.send(['changed_subscriptions', streams]);
208+
});
203209
} else if (action == 'uploadCrud') {
204210
await (data[1] as PortCompleter).handle(() async {
205211
await connector.uploadData(this);
@@ -368,6 +374,9 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
368374
}
369375
} else if (action == 'close') {
370376
await shutdown();
377+
} else if (action == 'changed_subscriptions') {
378+
openedStreamingSync
379+
?.updateSubscriptions(action[1] as List<SubscribedStream>);
371380
}
372381
}
373382
});

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class StreamingSyncImplementation implements StreamingSync {
3838
final BucketStorage adapter;
3939
final InternalConnector connector;
4040
final ResolvedSyncOptions options;
41-
final List<SubscribedStream> activeSubscriptions;
41+
List<SubscribedStream> _activeSubscriptions;
4242

4343
final Logger logger;
4444

@@ -72,7 +72,7 @@ class StreamingSyncImplementation implements StreamingSync {
7272
required this.crudUpdateTriggerStream,
7373
required this.options,
7474
required http.Client client,
75-
this.activeSubscriptions = const [],
75+
List<SubscribedStream> activeSubscriptions = const [],
7676
Mutex? syncMutex,
7777
Mutex? crudMutex,
7878
Logger? logger,
@@ -84,7 +84,8 @@ class StreamingSyncImplementation implements StreamingSync {
8484
syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"),
8585
crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"),
8686
_userAgentHeaders = userAgentHeaders(),
87-
logger = logger ?? isolateLogger;
87+
logger = logger ?? isolateLogger,
88+
_activeSubscriptions = activeSubscriptions;
8889

8990
Duration get _retryDelay => options.retryDelay;
9091

@@ -128,6 +129,13 @@ class StreamingSyncImplementation implements StreamingSync {
128129
return _abort?.aborted ?? false;
129130
}
130131

132+
void updateSubscriptions(List<SubscribedStream> streams) {
133+
_activeSubscriptions = streams;
134+
if (_nonLineSyncEvents.hasListener) {
135+
_nonLineSyncEvents.add(const AbortCurrentIteration());
136+
}
137+
}
138+
131139
@override
132140
Future<void> streamingSync() async {
133141
try {
@@ -609,7 +617,7 @@ final class _ActiveRustStreamingIteration {
609617
'parameters': sync.options.params,
610618
'schema': convert.json.decode(sync.schemaJson),
611619
'include_defaults': sync.options.includeDefaultStreams,
612-
'active_streams': sync.activeSubscriptions
620+
'active_streams': sync._activeSubscriptions
613621
.map((s) => {'name': s.name, 'params': s.parameters})
614622
}),
615623
);

0 commit comments

Comments
 (0)