Skip to content

Commit 7b22257

Browse files
committed
Track subscriptions across requests
1 parent 231b2d6 commit 7b22257

File tree

5 files changed

+134
-14
lines changed

5 files changed

+134
-14
lines changed

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class PowerSyncDatabaseImpl
143143
connector: connector,
144144
options: options.source,
145145
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
146+
subscriptions: initiallyActiveStreams,
146147
);
147148
} catch (e) {
148149
logger.warning(
@@ -159,6 +160,7 @@ class PowerSyncDatabaseImpl
159160
crudUpdateTriggerStream: crudStream,
160161
options: options,
161162
client: BrowserClient(),
163+
activeSubscriptions: initiallyActiveStreams,
162164
// Only allows 1 sync implementation to run at a time per database
163165
// This should be global (across tabs) when using Navigator locks.
164166
identifier: database.openFactory.path,
@@ -170,7 +172,10 @@ class PowerSyncDatabaseImpl
170172
});
171173
sync.streamingSync();
172174

175+
final subscriptions = activeStreams.listen(sync.updateSubscriptions);
176+
173177
abort.onAbort.then((_) async {
178+
subscriptions.cancel();
174179
await sync.abort();
175180
abort.completeAbort();
176181
}).ignore();

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ abstract interface class StreamingSync {
3030

3131
/// Close any active streams.
3232
Future<void> abort();
33+
34+
void updateSubscriptions(List<SubscribedStream> streams);
3335
}
3436

3537
@internal
@@ -129,6 +131,7 @@ class StreamingSyncImplementation implements StreamingSync {
129131
return _abort?.aborted ?? false;
130132
}
131133

134+
@override
132135
void updateSubscriptions(List<SubscribedStream> streams) {
133136
_activeSubscriptions = streams;
134137
if (_nonLineSyncEvents.hasListener) {

packages/powersync_core/lib/src/web/sync_controller.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class SyncWorkerHandle implements StreamingSync {
1515
final PowerSyncBackendConnector connector;
1616
final SyncOptions options;
1717
late final WorkerCommunicationChannel _channel;
18+
List<SubscribedStream> subscriptions;
1819

1920
final StreamController<SyncStatus> _status = StreamController.broadcast();
2021

@@ -24,6 +25,7 @@ class SyncWorkerHandle implements StreamingSync {
2425
required this.options,
2526
required MessagePort sendToWorker,
2627
required SharedWorker worker,
28+
required this.subscriptions,
2729
}) {
2830
_channel = WorkerCommunicationChannel(
2931
port: sendToWorker,
@@ -81,6 +83,7 @@ class SyncWorkerHandle implements StreamingSync {
8183
required PowerSyncBackendConnector connector,
8284
required Uri workerUri,
8385
required SyncOptions options,
86+
required List<SubscribedStream> subscriptions,
8487
}) async {
8588
final worker = SharedWorker(workerUri.toString().toJS);
8689
final handle = SyncWorkerHandle._(
@@ -89,6 +92,7 @@ class SyncWorkerHandle implements StreamingSync {
8992
connector: connector,
9093
sendToWorker: worker.port,
9194
worker: worker,
95+
subscriptions: subscriptions,
9296
);
9397

9498
// Make sure that the worker is working, or throw immediately.
@@ -116,6 +120,13 @@ class SyncWorkerHandle implements StreamingSync {
116120
database.database.openFactory.path,
117121
ResolvedSyncOptions(options),
118122
database.schema,
123+
subscriptions,
119124
);
120125
}
126+
127+
@override
128+
void updateSubscriptions(List<SubscribedStream> streams) {
129+
subscriptions = streams;
130+
_channel.updateSubscriptions(streams);
131+
}
121132
}

packages/powersync_core/lib/src/web/sync_worker.dart

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import 'dart:convert';
88
import 'dart:js_interop';
99

1010
import 'package:async/async.dart';
11+
import 'package:collection/collection.dart';
1112
import 'package:http/browser_client.dart';
1213
import 'package:logging/logging.dart';
1314
import 'package:powersync_core/powersync_core.dart';
@@ -45,15 +46,20 @@ class _SyncWorker {
4546
});
4647
}
4748

48-
_SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options,
49-
String schemaJson, _ConnectedClient client) {
49+
_SyncRunner referenceSyncTask(
50+
String databaseIdentifier,
51+
SyncOptions options,
52+
String schemaJson,
53+
List<SubscribedStream> subscriptions,
54+
_ConnectedClient client) {
5055
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
5156
return _SyncRunner(databaseIdentifier);
5257
})
5358
..registerClient(
5459
client,
5560
options,
5661
schemaJson,
62+
subscriptions,
5763
);
5864
}
5965
}
@@ -90,13 +96,20 @@ class _ConnectedClient {
9096
},
9197
);
9298

93-
_runner = _worker.referenceSyncTask(request.databaseName,
94-
recoveredOptions, request.schemaJson, this);
99+
_runner = _worker.referenceSyncTask(
100+
request.databaseName,
101+
recoveredOptions,
102+
request.schemaJson,
103+
request.subscriptions?.toDart ?? const [],
104+
this,
105+
);
95106
return (JSObject(), null);
96107
case SyncWorkerMessageType.abortSynchronization:
97108
_runner?.disconnectClient(this);
98109
_runner = null;
99110
return (JSObject(), null);
111+
case SyncWorkerMessageType.updateSubscriptions:
112+
return (JSObject(), null);
100113
default:
101114
throw StateError('Unexpected message type $type');
102115
}
@@ -137,9 +150,10 @@ class _SyncRunner {
137150
final StreamGroup<_RunnerEvent> _group = StreamGroup();
138151
final StreamController<_RunnerEvent> _mainEvents = StreamController();
139152

140-
StreamingSync? sync;
153+
StreamingSyncImplementation? sync;
141154
_ConnectedClient? databaseHost;
142-
final connections = <_ConnectedClient>[];
155+
final connections = <_ConnectedClient, List<SubscribedStream>>{};
156+
List<SubscribedStream> currentStreams = [];
143157

144158
_SyncRunner(this.identifier) {
145159
_group.add(_mainEvents.stream);
@@ -152,8 +166,9 @@ class _SyncRunner {
152166
:final client,
153167
:final options,
154168
:final schemaJson,
169+
:final subscriptions,
155170
):
156-
connections.add(client);
171+
connections[client] = subscriptions;
157172
final (newOptions, reconnect) = this.options.applyFrom(options);
158173
this.options = newOptions;
159174
this.schemaJson = schemaJson;
@@ -165,6 +180,8 @@ class _SyncRunner {
165180
sync?.abort();
166181
sync = null;
167182
await _requestDatabase(client);
183+
} else {
184+
reindexSubscriptions();
168185
}
169186
case _RemoveConnection(:final client):
170187
connections.remove(client);
@@ -191,6 +208,12 @@ class _SyncRunner {
191208
} else {
192209
await _requestDatabase(newHost);
193210
}
211+
case _ClientSubscriptionsChanged(
212+
:final client,
213+
:final subscriptions
214+
):
215+
connections[client] = subscriptions;
216+
reindexSubscriptions();
194217
}
195218
} catch (e, s) {
196219
_logger.warning('Error handling $event', e, s);
@@ -199,12 +222,22 @@ class _SyncRunner {
199222
});
200223
}
201224

225+
/// Updates [currentStreams] to the union of values in [connections].
226+
void reindexSubscriptions() {
227+
final before = currentStreams.toSet();
228+
final after = connections.values.flattenedToSet;
229+
if (!const SetEquality<SubscribedStream>().equals(before, after)) {
230+
currentStreams = after.toList();
231+
sync?.updateSubscriptions(currentStreams);
232+
}
233+
}
234+
202235
/// Pings all current [connections], removing those that don't answer in 5s
203236
/// (as they are likely closed tabs as well).
204237
///
205238
/// Returns the first client that responds (without waiting for others).
206239
Future<_ConnectedClient?> _collectActiveClients() async {
207-
final candidates = connections.toList();
240+
final candidates = connections.keys.toList();
208241
if (candidates.isEmpty) {
209242
return null;
210243
}
@@ -269,6 +302,7 @@ class _SyncRunner {
269302
);
270303
}
271304

305+
currentStreams = connections.values.flattenedToSet.toList();
272306
sync = StreamingSyncImplementation(
273307
adapter: WebBucketStorage(database),
274308
schemaJson: client._runner!.schemaJson,
@@ -283,20 +317,21 @@ class _SyncRunner {
283317
options: options,
284318
client: BrowserClient(),
285319
identifier: identifier,
320+
activeSubscriptions: currentStreams,
286321
);
287322
sync!.statusStream.listen((event) {
288323
_logger.fine('Broadcasting sync event: $event');
289-
for (final client in connections) {
324+
for (final client in connections.keys) {
290325
client.channel.notify(SyncWorkerMessageType.notifySyncStatus,
291326
SerializedSyncStatus.from(event));
292327
}
293328
});
294329
sync!.streamingSync();
295330
}
296331

297-
void registerClient(
298-
_ConnectedClient client, SyncOptions options, String schemaJson) {
299-
_mainEvents.add(_AddConnection(client, options, schemaJson));
332+
void registerClient(_ConnectedClient client, SyncOptions options,
333+
String schemaJson, List<SubscribedStream> subscriptions) {
334+
_mainEvents.add(_AddConnection(client, options, schemaJson, subscriptions));
300335
}
301336

302337
/// Remove a client, disconnecting if no clients remain..
@@ -308,6 +343,11 @@ class _SyncRunner {
308343
void disconnectClient(_ConnectedClient client) {
309344
_mainEvents.add(_DisconnectClient(client));
310345
}
346+
347+
void updateClientSubscriptions(
348+
_ConnectedClient client, List<SubscribedStream> subscriptions) {
349+
_mainEvents.add(_ClientSubscriptionsChanged(client, subscriptions));
350+
}
311351
}
312352

313353
sealed class _RunnerEvent {}
@@ -316,8 +356,10 @@ final class _AddConnection implements _RunnerEvent {
316356
final _ConnectedClient client;
317357
final SyncOptions options;
318358
final String schemaJson;
359+
final List<SubscribedStream> subscriptions;
319360

320-
_AddConnection(this.client, this.options, this.schemaJson);
361+
_AddConnection(
362+
this.client, this.options, this.schemaJson, this.subscriptions);
321363
}
322364

323365
final class _RemoveConnection implements _RunnerEvent {
@@ -332,6 +374,13 @@ final class _DisconnectClient implements _RunnerEvent {
332374
_DisconnectClient(this.client);
333375
}
334376

377+
final class _ClientSubscriptionsChanged implements _RunnerEvent {
378+
final _ConnectedClient client;
379+
final List<SubscribedStream> subscriptions;
380+
381+
_ClientSubscriptionsChanged(this.client, this.subscriptions);
382+
}
383+
335384
final class _ActiveDatabaseClosed implements _RunnerEvent {
336385
const _ActiveDatabaseClosed();
337386
}

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import 'package:web/web.dart';
99

1010
import '../connector.dart';
1111
import '../log.dart';
12+
import '../sync/streaming_sync.dart';
1213
import '../sync/sync_status.dart';
1314

1415
/// Names used in [SyncWorkerMessage]
@@ -20,6 +21,9 @@ enum SyncWorkerMessageType {
2021
/// If parameters change, the sync worker reconnects.
2122
startSynchronization,
2223

24+
/// Update the active subscriptions that this client is interested in.
25+
updateSubscriptions,
26+
2327
/// The [SyncWorkerMessage.payload] for the request is a numeric id, the
2428
/// response can be anything (void).
2529
/// This disconnects immediately, even if other clients are still open.
@@ -74,6 +78,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
7478
required String implementationName,
7579
required String schemaJson,
7680
String? syncParamsEncoded,
81+
UpdateSubscriptions? subscriptions,
7782
});
7883

7984
external String get databaseName;
@@ -83,6 +88,36 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
8388
external String? get implementationName;
8489
external String get schemaJson;
8590
external String? get syncParamsEncoded;
91+
external UpdateSubscriptions? get subscriptions;
92+
}
93+
94+
@anonymous
95+
extension type UpdateSubscriptions.__(JSObject _inner) implements JSObject {
96+
external factory UpdateSubscriptions._({
97+
required int requestId,
98+
required JSArray content,
99+
});
100+
101+
factory UpdateSubscriptions(int requestId, List<SubscribedStream> streams) {
102+
return UpdateSubscriptions._(
103+
requestId: requestId,
104+
content: streams
105+
.map((e) => <JSString>[e.name.toJS, e.parameters.toJS].toJS)
106+
.toList()
107+
.toJS,
108+
);
109+
}
110+
111+
external int get requestId;
112+
external JSArray get content;
113+
114+
List<SubscribedStream> get toDart {
115+
return content.toDart.map((e) {
116+
final [name, parameters] = (e as JSArray<JSString>).toDart;
117+
118+
return (name: name.toDart, parameters: parameters.toDart);
119+
}).toList();
120+
}
86121
}
87122

88123
@anonymous
@@ -339,6 +374,8 @@ final class WorkerCommunicationChannel {
339374
return;
340375
case SyncWorkerMessageType.startSynchronization:
341376
requestId = (message.payload as StartSynchronization).requestId;
377+
case SyncWorkerMessageType.updateSubscriptions:
378+
requestId = (message.payload as UpdateSubscriptions).requestId;
342379
case SyncWorkerMessageType.requestEndpoint:
343380
case SyncWorkerMessageType.abortSynchronization:
344381
case SyncWorkerMessageType.credentialsCallback:
@@ -413,7 +450,11 @@ final class WorkerCommunicationChannel {
413450
}
414451

415452
Future<void> startSynchronization(
416-
String databaseName, ResolvedSyncOptions options, Schema schema) async {
453+
String databaseName,
454+
ResolvedSyncOptions options,
455+
Schema schema,
456+
List<SubscribedStream> streams,
457+
) async {
417458
final (id, completion) = _newRequest();
418459
port.postMessage(SyncWorkerMessage(
419460
type: SyncWorkerMessageType.startSynchronization.name,
@@ -428,11 +469,22 @@ final class WorkerCommunicationChannel {
428469
null => null,
429470
final params => jsonEncode(params),
430471
},
472+
subscriptions: UpdateSubscriptions(-1, streams),
431473
),
432474
));
433475
await completion;
434476
}
435477

478+
Future<void> updateSubscriptions(List<SubscribedStream> streams) async {
479+
final (id, completion) = _newRequest();
480+
port.postMessage(SyncWorkerMessage(
481+
type: SyncWorkerMessageType.updateSubscriptions.name,
482+
payload: UpdateSubscriptions(id, streams),
483+
));
484+
485+
await completion;
486+
}
487+
436488
Future<void> abortSynchronization() async {
437489
await _numericRequest(SyncWorkerMessageType.abortSynchronization);
438490
}

0 commit comments

Comments
 (0)