From fa16022a0646d7538f7cc9610bd568bd6fcd65a5 Mon Sep 17 00:00:00 2001 From: Alex Dochioiu Date: Wed, 13 Mar 2024 15:35:10 +0200 Subject: [PATCH 1/2] wip --- drift/lib/src/remote/protocol.dart | 3 +- .../lib/src/web/wasm_setup/shared_worker.dart | 23 +++ drift/lib/web/worker.dart | 137 +++++++++++++++--- drift/pubspec.yaml | 3 +- 4 files changed, 144 insertions(+), 22 deletions(-) diff --git a/drift/lib/src/remote/protocol.dart b/drift/lib/src/remote/protocol.dart index d6ddb99fd..f1eacb511 100644 --- a/drift/lib/src/remote/protocol.dart +++ b/drift/lib/src/remote/protocol.dart @@ -54,7 +54,8 @@ class DriftProtocol { } Message deserialize(Object message) { - if (message is! List) throw const FormatException('Cannot read message'); + if (message is! List) + throw FormatException('Cannot read message ${message.runtimeType}'); final tag = message[0]; final id = message[1] as int; diff --git a/drift/lib/src/web/wasm_setup/shared_worker.dart b/drift/lib/src/web/wasm_setup/shared_worker.dart index c1a183877..bc633817d 100644 --- a/drift/lib/src/web/wasm_setup/shared_worker.dart +++ b/drift/lib/src/web/wasm_setup/shared_worker.dart @@ -1,6 +1,8 @@ // ignore_for_file: public_member_api_docs import 'dart:async'; +import 'dart:js'; import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; import 'package:web/web.dart'; @@ -9,6 +11,12 @@ import 'protocol.dart'; import 'shared.dart'; import 'types.dart'; +extension type ChromeConnectInfo._(JSObject _) implements JSObject { + external String get name; + + external ChromeConnectInfo({required String name}); +} + class SharedDriftWorker { final SharedWorkerGlobalScope self; @@ -23,10 +31,25 @@ class SharedDriftWorker { void start() { const event = EventStreamProviders.connectEvent; + event.forTarget(self).listen((e) => _newConnection(e as MessageEvent)); + + final port = ((self.getProperty("chrome".toJS) as JSObject) + .getProperty("runtime".toJS) as JSObject) + .callMethod("connect".toJS, ChromeConnectInfo(name: "drift")); + _newChromeConnection(port as MessagePort); + } + + void _newChromeConnection(MessagePort clientPort) async { + print("Chrome connect EVENT: "); + clientPort.start(); + EventStreamProviders.messageEvent + .forTarget(clientPort) + .listen((event) => _messageFromClient(clientPort, event)); } void _newConnection(MessageEvent event) async { + print("Connect EVENT: "); final clientPort = event.ports.toDart[0]; clientPort.start(); EventStreamProviders.messageEvent diff --git a/drift/lib/web/worker.dart b/drift/lib/web/worker.dart index 510055c2c..a59463284 100644 --- a/drift/lib/web/worker.dart +++ b/drift/lib/web/worker.dart @@ -4,10 +4,15 @@ /// For more details on how to use this library, see [the documentation]. /// /// [the documentation]: https://drift.simonbinder.eu/web/#using-web-workers +// ignore_for_file: public_member_api_docs + library drift.web.workers; import 'dart:async'; import 'dart:html'; +// import 'dart:html'; +import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; import 'package:async/async.dart'; import 'package:drift/drift.dart'; @@ -115,18 +120,7 @@ enum DriftWorkerMode { /// contains additional information and an example on how to use workers with /// Dart and Drift. void driftWorkerMain(QueryExecutor Function() openConnection) { - final self = WorkerGlobalScope.instance; - _RunningDriftWorker worker; - - if (self is SharedWorkerGlobalScope) { - worker = _RunningDriftWorker(true, openConnection); - } else if (self is DedicatedWorkerGlobalScope) { - worker = _RunningDriftWorker(false, openConnection); - } else { - throw StateError('This worker is neither a shared nor a dedicated worker'); - } - - worker.start(); + _RunningDriftWorker(true, openConnection).start(); } /// Spawn or connect to a web worker written with [driftWorkerMain]. @@ -154,10 +148,16 @@ Future connectToDriftWorker(String workerJsUri, worker.postMessage(webChannel.port1, [webChannel.port1]); channel = webChannel.port2.channel(); } else { - final worker = SharedWorker(workerJsUri, 'drift database'); - final port = worker.port!; - - var didGetInitializationResponse = false; + final chrome = + (WorkerGlobalScope.instance as JSObject).getProperty("chrome".toJS); + final runtime = (chrome as JSObject).getProperty("runtime".toJS); + final jsPort = (runtime as JSObject) + .callMethod("connect".toJS, ChromeConnectInfo(name: "drift database")); + final port = jsPort as ChromeRuntimePort; + // final worker = SharedWorker(workerJsUri, 'drift database'); + // final port = worker.port!; + + var didGetInitializationResponse = true; port.postMessage(mode.name); channel = port.channel().transformStream(StreamTransformer.fromHandlers( handleData: (data, sink) { @@ -185,6 +185,92 @@ Future connectToDriftWorker(String workerJsUri, return connectToRemoteAndInitialize(channel); } +extension type ChromeConnectInfo._(JSObject _) implements JSObject { + external String get name; + + external ChromeConnectInfo({required String name}); +} + +@JS() +@staticInterop +class ChromeRuntimePort {} + +extension ChromeRuntimePortX on ChromeRuntimePort { + external void disconnect(); + @JS('postMessage') + external void _postMessage(JSAny? message); + + void postMessage(Object? message) => _postMessage(message.jsify()); + + external void close(); + + external String get name; + + external ChromePortOnMessage get onMessage; +} + +@JS() +@staticInterop +class ChromePortOnMessage {} + +@JS() +@staticInterop +class ChromePortOnDisconnect {} + +extension ChromeRuntimePortOnMessageX on ChromePortOnMessage { + @JS('addListener') + external void _addListener(JSExportedDartFunction listener); + // external void _removeListener(JSExportedDartFunction listener); + + void addListener(ChromePortOnMessageListener listener) => + _addListener(listener.toJS); +} + +typedef ChromePortOnMessageListener = void Function( + JSAny? message, ChromeRuntimePort port); + +const _disconnectMessage = '_disconnect'; + +/// Extension to transform a raw [MessagePort] from web workers into a Dart +/// [StreamChannel]. +extension PortToChannel on ChromeRuntimePort { + /// Converts this port to a two-way communication channel, exposed as a + /// [StreamChannel]. + /// + /// This can be used to implement a remote database connection over service + /// workers. + /// + /// The [explicitClose] parameter can be used to control whether a close + /// message should be sent through the channel when it is closed. This will + /// cause it to be closed on the other end as well. Note that this is not a + /// reliable way of determining channel closures though, as there is no event + /// for channels being closed due to a tab or worker being closed. + /// Both "ends" of a JS channel calling [channel] on their part must use the + /// value for [explicitClose]. + StreamChannel channel({bool explicitClose = false}) { + final controller = StreamChannelController(); + onMessage.addListener((message, port) { + if (explicitClose && message == _disconnectMessage) { + // Other end has closed the connection + controller.local.sink.close(); + } else { + controller.local.sink.add(message.dartify()); + } + }); + + controller.local.stream.listen(postMessage, onDone: () { + // Closed locally, inform the other end. + if (explicitClose) { + postMessage(_disconnectMessage); + } + + close(); + }); + + return controller.foreign; + } +} + class _RunningDriftWorker { final bool isShared; final QueryExecutor Function() connectionFactory; @@ -196,10 +282,22 @@ class _RunningDriftWorker { _RunningDriftWorker(this.isShared, this.connectionFactory); void start() { + print("DOING DRIFT START"); if (isShared) { - const event = EventStreamProvider('connect'); - event.forTarget(self).listen(_newConnection); + final chrome = (self as JSObject).getProperty("chrome".toJS); + final runtime = (chrome as JSObject).getProperty("runtime".toJS); + final onConnect = (runtime as JSObject).getProperty("onConnect".toJS); + (onConnect as JSObject).callMethod( + "addListener".toJS, + ((ChromeRuntimePort port) { + print("CONNECTED"); + _newConnection(port); + }).toJS, + ); + // const event = EventStreamProvider('connect'); + // event.forTarget(self).listen(_newConnection); } else { + print("DRIFT START NOT SHARED"); const event = EventStreamProvider('message'); event.forTarget(self).map((e) => e.data).listen(_handleMessage); } @@ -226,9 +324,8 @@ class _RunningDriftWorker { } /// Handle a new connection, which implies that this worker is shared. - void _newConnection(MessageEvent event) { + void _newConnection(ChromeRuntimePort outgoingPort) { assert(isShared); - final outgoingPort = event.ports.first; // We still don't know whether this shared worker is supposed to host the // server itself or whether this is delegated to a dedicated worker managed diff --git a/drift/pubspec.yaml b/drift/pubspec.yaml index ba3bf91a7..3b6cb51e4 100644 --- a/drift/pubspec.yaml +++ b/drift/pubspec.yaml @@ -27,7 +27,8 @@ dev_dependencies: build_runner_core: ^7.0.0 build_verify: ^3.0.0 build_web_compilers: ^4.0.3 - drift_dev: any + drift_dev: + path: ../drift_dev drift_testcases: path: ../extras/integration_tests/drift_testcases http: ^0.13.4 From 479d404a94b12187d39a6c25c1bfabab1f7ddf78 Mon Sep 17 00:00:00 2001 From: Alex Dochioiu Date: Thu, 14 Mar 2024 10:00:34 +0200 Subject: [PATCH 2/2] --wip-- [skip ci] --- .../lib/src/web/wasm_setup/shared_worker.dart | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/drift/lib/src/web/wasm_setup/shared_worker.dart b/drift/lib/src/web/wasm_setup/shared_worker.dart index bc633817d..c1a183877 100644 --- a/drift/lib/src/web/wasm_setup/shared_worker.dart +++ b/drift/lib/src/web/wasm_setup/shared_worker.dart @@ -1,8 +1,6 @@ // ignore_for_file: public_member_api_docs import 'dart:async'; -import 'dart:js'; import 'dart:js_interop'; -import 'dart:js_interop_unsafe'; import 'package:web/web.dart'; @@ -11,12 +9,6 @@ import 'protocol.dart'; import 'shared.dart'; import 'types.dart'; -extension type ChromeConnectInfo._(JSObject _) implements JSObject { - external String get name; - - external ChromeConnectInfo({required String name}); -} - class SharedDriftWorker { final SharedWorkerGlobalScope self; @@ -31,25 +23,10 @@ class SharedDriftWorker { void start() { const event = EventStreamProviders.connectEvent; - event.forTarget(self).listen((e) => _newConnection(e as MessageEvent)); - - final port = ((self.getProperty("chrome".toJS) as JSObject) - .getProperty("runtime".toJS) as JSObject) - .callMethod("connect".toJS, ChromeConnectInfo(name: "drift")); - _newChromeConnection(port as MessagePort); - } - - void _newChromeConnection(MessagePort clientPort) async { - print("Chrome connect EVENT: "); - clientPort.start(); - EventStreamProviders.messageEvent - .forTarget(clientPort) - .listen((event) => _messageFromClient(clientPort, event)); } void _newConnection(MessageEvent event) async { - print("Connect EVENT: "); final clientPort = event.ports.toDart[0]; clientPort.start(); EventStreamProviders.messageEvent