Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion demos/supabase-todolist/lib/powersync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ Future<String> getDatabasePath() async {
return join(dir.path, dbFilename);
}

const options = SyncOptions(syncImplementation: SyncClientImplementation.rust);
const options = SyncOptions(
syncImplementation: SyncClientImplementation.rust,
appMetadata: {'app_version': '1.0.1'});

Future<void> openDatabase() async {
// Open the local database
Expand Down
12 changes: 12 additions & 0 deletions packages/powersync_core/lib/src/sync/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import 'package:meta/meta.dart';

/// Options that affect how the sync client connects to the sync service.
final class SyncOptions {
/// A map of application metadata that is passed to the PowerSync service.
///
/// Application metadata that will be displayed in PowerSync service logs.
final Map<String, String>? appMetadata;

/// A JSON object that is passed to the sync service and forwarded to sync
/// rules.
///
Expand Down Expand Up @@ -39,19 +44,22 @@ final class SyncOptions {
this.params,
this.syncImplementation = SyncClientImplementation.defaultClient,
this.includeDefaultStreams,
this.appMetadata,
});

SyncOptions _copyWith({
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
Map<String, String>? appMetadata,
}) {
return SyncOptions(
crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime,
retryDelay: retryDelay,
params: params ?? this.params,
syncImplementation: syncImplementation,
includeDefaultStreams: includeDefaultStreams,
appMetadata: appMetadata ?? this.appMetadata,
);
}
}
Expand Down Expand Up @@ -88,14 +96,18 @@ extension type ResolvedSyncOptions(SyncOptions source) {
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
Map<String, String>? appMetadata,
}) {
return ResolvedSyncOptions((source ?? SyncOptions())._copyWith(
crudThrottleTime: crudThrottleTime,
retryDelay: retryDelay,
params: params,
appMetadata: appMetadata,
));
}

Map<String, String> get appMetadata => source.appMetadata ?? const {};

Duration get crudThrottleTime =>
source.crudThrottleTime ?? const Duration(milliseconds: 10);

Expand Down
7 changes: 5 additions & 2 deletions packages/powersync_core/lib/src/sync/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ class StreamingSyncRequest {
bool includeChecksum = true;
String clientId;
Map<String, dynamic>? parameters;
Map<String, String>? appMetadata;

StreamingSyncRequest(this.buckets, this.parameters, this.clientId);
StreamingSyncRequest(
this.buckets, this.parameters, this.clientId, this.appMetadata);

Map<String, dynamic> toJson() {
final Map<String, dynamic> json = {
'buckets': buckets,
'include_checksum': includeChecksum,
'raw_data': true,
'client_id': clientId
'client_id': clientId,
'app_metadata': appMetadata,
};

if (parameters != null) {
Expand Down
9 changes: 5 additions & 4 deletions packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import 'package:powersync_core/src/sync/options.dart';
import 'package:powersync_core/src/user_agent/user_agent.dart';
import 'package:sqlite_async/mutex.dart';

import 'bucket_storage.dart';
import '../crud.dart';
import 'bucket_storage.dart';
import 'instruction.dart';
import 'internal_connector.dart';
import 'mutable_sync_status.dart';
import 'protocol.dart';
import 'stream_utils.dart';
import 'sync_status.dart';
import 'protocol.dart';

typedef SubscribedStream = ({String name, String parameters});

Expand Down Expand Up @@ -339,8 +339,8 @@ class StreamingSyncImplementation implements StreamingSync {

Checkpoint? targetCheckpoint;

var requestStream = _streamingSyncRequest(
StreamingSyncRequest(bucketRequests, options.params, clientId!))
var requestStream = _streamingSyncRequest(StreamingSyncRequest(
bucketRequests, options.params, clientId!, options.appMetadata))
.map(ReceivedLine.new);

var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream);
Expand Down Expand Up @@ -633,6 +633,7 @@ final class _ActiveRustStreamingIteration {
await _control(
'start',
convert.json.encode({
'app_metadata': sync.options.appMetadata,
'parameters': sync.options.params,
'schema': convert.json.decode(sync.schemaJson),
'include_defaults': sync.options.includeDefaultStreams,
Expand Down
38 changes: 37 additions & 1 deletion packages/powersync_core/test/sync/stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import 'dart:convert';
import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:powersync_core/powersync_core.dart';

import 'package:test/test.dart';

import '../server/sync_server/in_memory_sync_server.dart';
Expand Down Expand Up @@ -260,4 +259,41 @@ void main() {
);
a.unsubscribe();
});

test('passes app metadata to the server (rust)', () async {
options = SyncOptions(
syncImplementation: SyncClientImplementation.rust,
appMetadata: {'foo': 'bar'},
);

await waitForConnection();

final request = await syncService.waitForListener;
expect(
json.decode(await request.readAsString()),
containsPair(
'app_metadata',
containsPair('foo', 'bar'),
),
);
});

test('passes app metadata to the server (legacy sync client)', () async {
options = SyncOptions(
// ignore: deprecated_member_use_from_same_package
syncImplementation: SyncClientImplementation.dart,
appMetadata: {'foo': 'bar'},
);

await waitForConnection();

final request = await syncService.waitForListener;
expect(
json.decode(await request.readAsString()),
containsPair(
'app_metadata',
containsPair('foo', 'bar'),
),
);
});
}