Skip to content

Commit c1d0c54

Browse files
committed
Register stream subscriptions
1 parent 6ec3d72 commit c1d0c54

File tree

9 files changed

+157
-117
lines changed

9 files changed

+157
-117
lines changed

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ class PowerSyncDatabaseImpl
133133
Future<void> connectInternal({
134134
required PowerSyncBackendConnector connector,
135135
required ResolvedSyncOptions options,
136+
required List<SubscribedStream> initiallyActiveStreams,
137+
required Stream<List<({String name, String parameters})>> activeStreams,
136138
required AbortController abort,
137139
required Zone asyncWorkZone,
138140
}) async {

packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import 'package:powersync_core/src/abort_controller.dart';
77
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
88
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
99
import '../sync/options.dart';
10+
import '../sync/streaming_sync.dart';
1011
import 'powersync_database.dart';
1112

1213
import '../connector.dart';
@@ -115,6 +116,8 @@ class PowerSyncDatabaseImpl
115116
Future<void> connectInternal({
116117
required PowerSyncBackendConnector connector,
117118
required AbortController abort,
119+
required List<SubscribedStream> initiallyActiveStreams,
120+
required Stream<List<SubscribedStream>> activeStreams,
118121
required Zone asyncWorkZone,
119122
required ResolvedSyncOptions options,
120123
}) {

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import 'package:powersync_core/src/sync/options.dart';
1818
import 'package:powersync_core/src/sync/sync_status.dart';
1919

2020
import '../sync/stream.dart';
21+
import '../sync/streaming_sync.dart';
2122

2223
mixin PowerSyncDatabaseMixin implements SqliteConnection {
2324
/// Schema used for the local database.
@@ -130,7 +131,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
130131
return isInitialized;
131132
}
132133

133-
Future<List<SyncStreamSubscription>> get activeSubscriptions {
134+
Future<List<SyncStream>> get subscribedStreams {
134135
throw UnimplementedError();
135136
}
136137

@@ -272,6 +273,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
272273
Future<void> connectInternal({
273274
required PowerSyncBackendConnector connector,
274275
required ResolvedSyncOptions options,
276+
required List<SubscribedStream> initiallyActiveStreams,
277+
required Stream<List<SubscribedStream>> activeStreams,
275278
required AbortController abort,
276279
required Zone asyncWorkZone,
277280
});

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ class PowerSyncDatabaseImpl
128128
Future<void> connectInternal({
129129
required PowerSyncBackendConnector connector,
130130
required AbortController abort,
131+
required List<SubscribedStream> initiallyActiveStreams,
132+
required Stream<List<SubscribedStream>> activeStreams,
131133
required Zone asyncWorkZone,
132134
required ResolvedSyncOptions options,
133135
}) async {

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 110 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,28 @@ import 'package:powersync_core/src/database/powersync_db_mixin.dart';
99
import 'package:powersync_core/src/sync/options.dart';
1010
import 'package:powersync_core/src/sync/stream.dart';
1111

12+
import 'streaming_sync.dart';
13+
14+
/// A (stream name, JSON parameters) pair that uniquely identifies a stream
15+
/// instantiation to subscribe to.
16+
typedef _RawStreamKey = (String, String);
17+
1218
@internal
1319
final class ConnectionManager {
1420
final PowerSyncDatabaseMixin db;
1521
final ActiveDatabaseGroup _activeGroup;
1622

23+
/// All streams (with parameters) for which a subscription has been requested
24+
/// explicitly.
25+
final Map<_RawStreamKey, _ActiveSubscription> _locallyActiveSubscriptions =
26+
{};
27+
1728
final StreamController<SyncStatus> _statusController = StreamController();
29+
30+
/// Fires when an entry is added or removed from [_locallyActiveSubscriptions]
31+
/// while we're connected.
32+
StreamController<void>? _subscriptionsChanged;
33+
1834
SyncStatus _currentStatus =
1935
const SyncStatus(connected: false, lastSyncedAt: null);
2036

@@ -29,9 +45,6 @@ final class ConnectionManager {
2945
/// sync mutex.
3046
AbortController? _abortActiveSync;
3147

32-
/// Only to be called in the sync mutex.
33-
Future<void> Function()? _connectWithLastOptions;
34-
3548
ConnectionManager(this.db) : _activeGroup = db.group;
3649

3750
void checkNotConnected() {
@@ -60,7 +73,8 @@ final class ConnectionManager {
6073
// connecting and disconnecting.
6174
await _activeGroup.syncConnectMutex.lock(() async {
6275
await _abortCurrentSync();
63-
_connectWithLastOptions = null;
76+
_subscriptionsChanged?.close();
77+
_subscriptionsChanged = null;
6478
});
6579

6680
manuallyChangeSyncStatus(
@@ -78,18 +92,10 @@ final class ConnectionManager {
7892
}
7993
}
8094

81-
Future<void> reconnect() async {
82-
// Also wrap this in the sync mutex to ensure there's no race between us
83-
// connecting and disconnecting.
84-
await _activeGroup.syncConnectMutex.lock(() async {
85-
if (_connectWithLastOptions case final activeSync?) {
86-
await _abortCurrentSync();
87-
assert(_abortActiveSync == null);
88-
89-
await activeSync();
90-
}
91-
});
92-
}
95+
List<SubscribedStream> get _subscribedStreams => [
96+
for (final active in _locallyActiveSubscriptions.values)
97+
(name: active.name, parameters: active.encodedParameters)
98+
];
9399

94100
Future<void> connect({
95101
required PowerSyncBackendConnector connector,
@@ -106,6 +112,8 @@ final class ConnectionManager {
106112

107113
late void Function() retryHandler;
108114

115+
final subscriptionsChanged = StreamController<void>();
116+
109117
Future<void> connectWithSyncLock() async {
110118
// Ensure there has not been a subsequent connect() call installing a new
111119
// sync client.
@@ -117,6 +125,10 @@ final class ConnectionManager {
117125
connector: connector,
118126
options: options,
119127
abort: thisConnectAborter,
128+
initiallyActiveStreams: _subscribedStreams,
129+
activeStreams: subscriptionsChanged.stream.map((_) {
130+
return _subscribedStreams;
131+
}),
120132
// Run follow-up async tasks in the parent zone, a new one is introduced
121133
// while we hold the lock (and async tasks won't hold the sync lock).
122134
asyncWorkZone: zone,
@@ -148,7 +160,7 @@ final class ConnectionManager {
148160
// Disconnect a previous sync client, if one is active.
149161
await _abortCurrentSync();
150162
assert(_abortActiveSync == null);
151-
_connectWithLastOptions = connectWithSyncLock;
163+
_subscriptionsChanged = subscriptionsChanged;
152164

153165
// Install the abort controller for this particular connect call, allowing
154166
// it to be disconnected.
@@ -187,78 +199,70 @@ final class ConnectionManager {
187199
}
188200
}
189201

202+
_SyncStreamSubscriptionHandle _referenceStreamSubscription(
203+
String stream, Map<String, Object?>? parameters) {
204+
final key = (stream, json.encode(parameters));
205+
_ActiveSubscription active;
206+
207+
if (_locallyActiveSubscriptions[key] case final current?) {
208+
active = current;
209+
} else {
210+
active = _ActiveSubscription(this,
211+
name: stream, parameters: parameters, encodedParameters: key.$2);
212+
_locallyActiveSubscriptions[key] = active;
213+
_subscriptionsChanged?.add(null);
214+
}
215+
216+
return _SyncStreamSubscriptionHandle(active);
217+
}
218+
219+
void _clearSubscription(_ActiveSubscription subscription) {
220+
assert(subscription.refcount == 0);
221+
_locallyActiveSubscriptions
222+
.remove((subscription.name, subscription.encodedParameters));
223+
_subscriptionsChanged?.add(null);
224+
}
225+
190226
Future<void> _subscriptionsCommand(Object? command) async {
191227
await db.writeTransaction((tx) {
192228
return db.execute(
193229
'SELECT powersync_control(?, ?)',
194230
['subscriptions', json.encode(command)],
195231
);
196232
});
197-
198-
await reconnect();
233+
_subscriptionsChanged?.add(null);
199234
}
200235

201236
Future<void> subscribe({
202237
required String stream,
203-
required Object? parameters,
238+
required Map<String, Object?>? parameters,
204239
Duration? ttl,
205-
BucketPriority? priority,
240+
StreamPriority? priority,
206241
}) async {
207242
await _subscriptionsCommand({
208243
'subscribe': {
209-
'stream': stream,
210-
'params': parameters,
244+
'stream': {
245+
'name': stream,
246+
'params': parameters,
247+
},
211248
'ttl': ttl?.inSeconds,
212249
'priority': priority,
213250
},
214251
});
215252
}
216253

217-
Future<void> unsubscribe({
254+
Future<void> unsubscribeAll({
218255
required String stream,
219256
required Object? parameters,
220257
}) async {
221258
await _subscriptionsCommand({
222259
'unsubscribe': {
223-
'stream': stream,
260+
'name': stream,
224261
'params': parameters,
225262
},
226263
});
227264
}
228265

229-
Future<SyncStreamSubscription?> resolveCurrent(
230-
String name, Map<String, Object?>? parameters) async {
231-
final row = await db.getOptional(
232-
'SELECT stream_name, active, is_default, local_priority, local_params, expires_at, last_synced_at, ttl FROM ps_stream_subscriptions WHERE stream_name = ? AND local_params = ?',
233-
[name, json.encode(parameters)],
234-
);
235-
236-
if (row == null) {
237-
return null;
238-
}
239-
240-
return _SyncStreamSubscription(
241-
this,
242-
name: name,
243-
parameters:
244-
json.decode(row['local_params'] as String) as Map<String, Object?>?,
245-
active: row['active'] != 0,
246-
isDefault: row['is_default'] != 0,
247-
hasExplicitSubscription: row['ttl'] != null,
248-
expiresAt: switch (row['expires_at']) {
249-
null => null,
250-
final expiresAt as int =>
251-
DateTime.fromMicrosecondsSinceEpoch(expiresAt * 1000),
252-
},
253-
hasSynced: row['has_synced'] != 0,
254-
lastSyncedAt: switch (row['last_synced_at']) {
255-
null => null,
256-
final lastSyncedAt as int =>
257-
DateTime.fromMicrosecondsSinceEpoch(lastSyncedAt * 1000),
258-
},
259-
);
260-
}
261-
262266
SyncStream syncStream(String name, Map<String, Object?>? parameters) {
263267
return _SyncStreamImplementation(this, name, parameters);
264268
}
@@ -280,70 +284,81 @@ final class _SyncStreamImplementation implements SyncStream {
280284
_SyncStreamImplementation(this._connections, this.name, this.parameters);
281285

282286
@override
283-
Future<SyncStreamSubscription?> get current {
284-
return _connections.resolveCurrent(name, parameters);
285-
}
286-
287-
@override
288-
Future<void> subscribe({
287+
Future<SyncStreamSubscription> subscribe({
289288
Duration? ttl,
290-
BucketPriority? priority,
289+
StreamPriority? priority,
291290
}) async {
292291
await _connections.subscribe(
293292
stream: name,
294293
parameters: parameters,
295294
ttl: ttl,
296295
priority: priority,
297296
);
297+
298+
return _connections._referenceStreamSubscription(name, parameters);
299+
}
300+
301+
@override
302+
Future<void> unsubscribeAll() async {
303+
await _connections.unsubscribeAll(stream: name, parameters: parameters);
298304
}
299305
}
300306

301-
final class _SyncStreamSubscription implements SyncStreamSubscription {
302-
final ConnectionManager _connections;
307+
final class _ActiveSubscription {
308+
final ConnectionManager connections;
309+
var refcount = 0;
303310

304-
@override
305311
final String name;
306-
@override
312+
final String encodedParameters;
307313
final Map<String, Object?>? parameters;
308314

309-
@override
310-
final bool active;
311-
@override
312-
final bool isDefault;
313-
@override
314-
final bool hasExplicitSubscription;
315-
@override
316-
final DateTime? expiresAt;
317-
@override
318-
final bool hasSynced;
319-
@override
320-
final DateTime? lastSyncedAt;
321-
322-
_SyncStreamSubscription(
323-
this._connections, {
315+
_ActiveSubscription(
316+
this.connections, {
324317
required this.name,
318+
required this.encodedParameters,
325319
required this.parameters,
326-
required this.active,
327-
required this.isDefault,
328-
required this.hasExplicitSubscription,
329-
required this.expiresAt,
330-
required this.hasSynced,
331-
required this.lastSyncedAt,
332320
});
333321

322+
void decrementRefCount() {
323+
refcount--;
324+
if (refcount == 0) {
325+
connections._clearSubscription(this);
326+
}
327+
}
328+
}
329+
330+
final class _SyncStreamSubscriptionHandle implements SyncStreamSubscription {
331+
final _ActiveSubscription _source;
332+
333+
_SyncStreamSubscriptionHandle(this._source) {
334+
_source.refcount++;
335+
336+
// This is not unreliable, but can help decrementing refcounts on the inner
337+
// subscription when this handle is deallocated without [unsubscribe] being
338+
// called.
339+
_finalizer.attach(this, _source, detach: this);
340+
}
341+
342+
@override
343+
String get name => _source.name;
344+
345+
@override
346+
Map<String, Object?>? get parameters => _source.parameters;
347+
334348
@override
335349
Future<void> unsubscribe() async {
336-
await _connections.unsubscribe(stream: name, parameters: parameters);
350+
_finalizer.detach(this);
351+
_source.decrementRefCount();
337352
}
338353

339354
@override
340355
Future<void> waitForFirstSync() async {
341-
if (hasSynced) {
342-
return;
343-
}
344-
return _connections.firstStatusMatching((status) {
356+
return _source.connections.firstStatusMatching((status) {
345357
final currentProgress = status.statusFor(this);
346358
return currentProgress?.subscription.hasSynced ?? false;
347359
});
348360
}
361+
362+
static final Finalizer<_ActiveSubscription> _finalizer =
363+
Finalizer((sub) => sub.decrementRefCount());
349364
}

0 commit comments

Comments
 (0)