diff --git a/dimos/conftest.py b/dimos/conftest.py index 6d7d1f509a..35804c8f62 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -210,10 +210,6 @@ def monitor_threads(request): # HuggingFace safetensors conversion thread - no user cleanup API # https://github.com/huggingface/transformers/issues/29513 "Thread-auto_conversion", - # rpyc spawns per-call response threads inside the connection's - # SpawnThread protocol. They get cleaned up when the connection - # is closed (in the rpyc client's teardown), not per-test. - "RpycSpawnThread-", ] new_threads = [ t diff --git a/dimos/core/coordination/blueprints.py b/dimos/core/coordination/blueprints.py index 7d94555de4..f21ff3fe30 100644 --- a/dimos/core/coordination/blueprints.py +++ b/dimos/core/coordination/blueprints.py @@ -141,6 +141,10 @@ def create(cls, module: type[ModuleBase], kwargs: dict[str, Any]) -> Self: ) +# These fields cannot be pickled. +_PROXY_FIELDS = ("transport_map", "global_config_overrides", "remapping_map") + + @dataclass(frozen=True) class Blueprint: blueprints: tuple[BlueprintAtom, ...] @@ -155,6 +159,18 @@ class Blueprint: requirement_checks: tuple[Callable[[], str | None], ...] = field(default_factory=tuple) configurator_checks: "tuple[SystemConfigurator, ...]" = field(default_factory=tuple) + def __getstate__(self) -> dict[str, Any]: + state = self.__dict__.copy() + state.pop("active_blueprints", None) # recomputable cached_property + for name in _PROXY_FIELDS: + state[name] = dict(state[name]) + return state + + def __setstate__(self, state: dict[str, Any]) -> None: + for name in _PROXY_FIELDS: + state[name] = MappingProxyType(state[name]) + self.__dict__.update(state) + @classmethod def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint": blueprint = BlueprintAtom.create(module, kwargs) diff --git a/dimos/core/coordination/coordinator_rpc.py b/dimos/core/coordination/coordinator_rpc.py new file mode 100644 index 0000000000..a74eaacacb --- /dev/null +++ b/dimos/core/coordination/coordinator_rpc.py @@ -0,0 +1,88 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from dimos.protocol.rpc.pubsubrpc import LCMRPC +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.protocol.rpc.spec import RPCInspectable + +logger = setup_logger() + + +class CoordinatorRPC: + """Owns the LCM RPC connection to the singleton Coordinator service.""" + + NAME = "Coordinator" + + def __init__(self, rpc: LCMRPC) -> None: + self._rpc = rpc + + @classmethod + def serve(cls, coordinator: RPCInspectable) -> CoordinatorRPC: + """Publish `coordinator`'s @rpc methods under the `Coordinator/` prefix.""" + cls._ensure_no_existing_service() + rpc = LCMRPC() + rpc.serve_module_rpc(coordinator, name=cls.NAME) + rpc.start() + return cls(rpc) + + @classmethod + def connect(cls, *, timeout: float) -> CoordinatorRPC: + """Attach to a running Coordinator, raising `TimeoutError` if none answers.""" + rpc = LCMRPC() + rpc.start() + client = cls(rpc) + try: + client.call("ping", rpc_timeout=timeout) + except BaseException: + rpc.stop() + raise + return client + + def call(self, method: str, *args: Any, rpc_timeout: float | None = None, **kwargs: Any) -> Any: + """Invoke `Coordinator/` and return its result.""" + result, _unsub = self._rpc.call_sync( + f"{self.NAME}/{method}", + ([*args], kwargs), + rpc_timeout=rpc_timeout, + ) + return result + + @property + def rpc(self) -> LCMRPC: + return self._rpc + + def stop(self) -> None: + try: + self._rpc.stop() + except Exception: + logger.error("Error closing Coordinator RPC service", exc_info=True) + + @classmethod + def _ensure_no_existing_service(cls) -> None: + probe = LCMRPC() + probe.start() + try: + try: + probe.call_sync(f"{cls.NAME}/ping", ([], {}), rpc_timeout=0.5) + except TimeoutError: + return + raise RuntimeError(f"another {cls.NAME} service is already running on this LCM bus") + finally: + probe.stop() diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index 9ac46e3c20..1821fe3824 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -15,7 +15,7 @@ from __future__ import annotations from collections import defaultdict -from collections.abc import Mapping, MutableMapping +from collections.abc import Callable, Mapping, MutableMapping import importlib import inspect import shutil @@ -23,7 +23,7 @@ import threading from typing import TYPE_CHECKING, Any, cast -from dimos.core.coordination.rpyc_server import RpycServer +from dimos.core.coordination.coordinator_rpc import CoordinatorRPC from dimos.core.coordination.worker_manager import WorkerManager from dimos.core.coordination.worker_manager_docker import WorkerManagerDocker from dimos.core.coordination.worker_manager_python import WorkerManagerPython @@ -31,6 +31,7 @@ from dimos.core.module import ModuleBase, ModuleSpec from dimos.core.resource import Resource from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport +from dimos.porcelain.remote_module_source import ModuleDescriptor from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance from dimos.utils.generic import short_id from dimos.utils.logging_config import setup_logger @@ -63,7 +64,7 @@ def __init__( self._module_transports: dict[type[ModuleBase], dict[str, PubSubTransport[Any]]] = {} self._started = False self._modules_lock = threading.RLock() - self._rpyc = RpycServer(self) + self._coordinator_rpc: CoordinatorRPC | None = None def start(self) -> None: from dimos.core.o3dpickle import register_picklers @@ -74,7 +75,9 @@ def start(self) -> None: self._started = True def stop(self) -> None: - self._rpyc.stop() + if self._coordinator_rpc is not None: + self._coordinator_rpc.stop() + self._coordinator_rpc = None for module_class, module in reversed(self._deployed_modules.items()): logger.info("Stopping module...", module=module_class.__name__) @@ -92,25 +95,50 @@ def _stop_manager(m: WorkerManager) -> None: safe_thread_map(tuple(self._managers.values()), _stop_manager) - def start_rpyc_service(self) -> int: - return self._rpyc.start() + def start_rpc_service(self) -> None: + """Expose the coordinator's API as @rpc methods over LCM.""" + if self._coordinator_rpc is not None: + return + self._coordinator_rpc = CoordinatorRPC.serve(self) - def list_module_names(self) -> list[str]: + @property + def rpcs(self) -> dict[str, Callable[..., Any]]: + """Methods exposed via the Coordinator @rpc service.""" + return { + "ping": self.ping, + "list_modules": self.list_modules, + "load_blueprint_by_name": self.load_blueprint_by_name, + "load_blueprint": self.load_blueprint, + "restart_module_by_class_name": self.restart_module_by_class_name, + } + + def ping(self) -> str: + """Used by clients to check if the coordinator is alive and responsive.""" + return "pong" + + def list_modules(self) -> list[ModuleDescriptor]: with self._modules_lock: - return [cls.__name__ for cls in self._deployed_modules] + descriptors: list[ModuleDescriptor] = [] + for cls in self._deployed_modules: + qualified = f"{cls.__module__}.{cls.__name__}" + descriptors.append( + ModuleDescriptor( + class_name=cls.__name__, + qualified_path=qualified, + rpc_names=list(cls.rpcs.keys()), + ) + ) + return descriptors - def get_module_endpoint(self, class_name: str) -> tuple[str, int, int]: - """Return (host, worker_rpyc_port, module_id) for the given class name. + def load_blueprint_by_name(self, name: str) -> None: + # Avoid circular import. + from dimos.robot.get_all_blueprints import get_by_name - Lazily starts the worker-side RPyC server on first use. - """ + self.load_blueprint(get_by_name(name)) + + def list_module_names(self) -> list[str]: with self._modules_lock: - for cls, proxy in self._deployed_modules.items(): - if cls.__name__ == class_name: - actor = cast("ModuleProxy", proxy).actor_instance - port = actor.start_rpyc() - return ("localhost", int(port), int(actor._module_id)) - raise KeyError(class_name) + return [cls.__name__ for cls in self._deployed_modules] def health_check(self) -> bool: return all(m.health_check() for m in self._managers.values()) @@ -404,11 +432,12 @@ def restart_module_by_class_name( class_name: str, *, reload_source: bool = True, - ) -> ModuleProxyProtocol: + ) -> None: with self._modules_lock: for cls in self._deployed_modules: if cls.__name__ == class_name: - return self._restart_module(cls, reload_source=reload_source) + self._restart_module(cls, reload_source=reload_source) + return raise ValueError(f"No deployed module with class name {class_name!r}") def restart_module( diff --git a/dimos/core/coordination/python_worker.py b/dimos/core/coordination/python_worker.py index 7b2116e47e..aa93c627fe 100644 --- a/dimos/core/coordination/python_worker.py +++ b/dimos/core/coordination/python_worker.py @@ -21,20 +21,15 @@ import signal import sys import threading -import time import traceback from typing import TYPE_CHECKING, Any -from rpyc.utils.server import ThreadedServer - -from dimos.core.coordination.rpyc_services import WorkerRpycService from dimos.core.coordination.worker_messages import ( CallMethodRequest, DeployModuleRequest, GetAttrRequest, SetRefRequest, ShutdownRequest, - StartRpycRequest, SuppressConsoleRequest, UndeployModuleRequest, WorkerRequest, @@ -135,10 +130,6 @@ def set_ref(self, ref: Any) -> ActorFuture: result = self._send_request_to_worker(SetRefRequest(module_id=self._module_id, ref=ref)) return ActorFuture(result) - def start_rpyc(self) -> int: - port: int = self._send_request_to_worker(StartRpycRequest()) - return port - def __getattr__(self, name: str) -> Any: """Proxy attribute access to the worker process.""" if name.startswith("_"): @@ -332,8 +323,6 @@ def _suppress_console_output() -> None: class _WorkerState: instances: dict[int, Any] worker_id: int - rpyc_server: ThreadedServer | None = None - rpyc_thread: threading.Thread | None = None should_stop: bool = False @@ -405,40 +394,7 @@ def _handle_request(request: Any, state: _WorkerState) -> WorkerResponse: _suppress_console_output() return WorkerResponse(result=True) - case StartRpycRequest(): - if state.rpyc_server is not None: - return WorkerResponse(result=state.rpyc_server.port) - WorkerRpycService._instances = state.instances - state.rpyc_server = ThreadedServer( - WorkerRpycService, - port=0, - hostname=global_config.listen_host, - protocol_config={ - "allow_all_attrs": True, - "allow_public_attrs": True, - "allow_pickle": True, - }, - ) - # `ThreadedServer.__init__` binds the socket but `listen()` only - # runs once `start()` executes on the thread, which sets - # `active=True` immediately after. Wait on that flag so callers - # never see a Connection refused before the accept loop is live. - state.rpyc_thread = threading.Thread(target=state.rpyc_server.start, daemon=True) - state.rpyc_thread.start() - deadline = time.monotonic() + 5.0 - while not state.rpyc_server.active: - if not state.rpyc_thread.is_alive(): - raise RuntimeError("rpyc server thread died before listening") - if time.monotonic() > deadline: - raise RuntimeError("rpyc server failed to start listening within 5s") - time.sleep(0.001) - return WorkerResponse(result=state.rpyc_server.port) - case ShutdownRequest(): - if state.rpyc_server is not None: - state.rpyc_server.close() - if state.rpyc_thread is not None: - state.rpyc_thread.join(timeout=5) state.should_stop = True return WorkerResponse(result=True) diff --git a/dimos/core/coordination/rpyc_server.py b/dimos/core/coordination/rpyc_server.py deleted file mode 100644 index 55f339af54..0000000000 --- a/dimos/core/coordination/rpyc_server.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -from threading import Thread -from typing import TYPE_CHECKING - -from rpyc.utils.server import ThreadedServer - -from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT -from dimos.core.coordination.rpyc_services import CoordinatorService -from dimos.core.global_config import global_config -from dimos.utils.logging_config import setup_logger - -if TYPE_CHECKING: - from dimos.core.coordination.module_coordinator import ModuleCoordinator - -logger = setup_logger() - - -class RpycServer: - def __init__(self, coordinator: ModuleCoordinator) -> None: - self._coordinator = coordinator - self._server: ThreadedServer | None = None - self._thread: Thread | None = None - - def start(self) -> int: - """Start the discovery service and return the bound port.""" - # Create a class at runtime because RPyC takes a class, not an object. - bound_service = type( - "_BoundCoordinatorService", - (CoordinatorService,), - {"_coordinator": self._coordinator}, - ) - - self._server = ThreadedServer( - bound_service, - port=0, - hostname=global_config.listen_host, - protocol_config={ - "allow_all_attrs": True, - "allow_public_attrs": True, - "allow_pickle": True, - }, - ) - self._thread = Thread(target=self._server.start, daemon=True, name="coordinator-rpyc") - self._thread.start() - return int(self._server.port) - - def stop(self) -> None: - if self._server is not None: - try: - self._server.close() - except Exception: - logger.error("Error closing coordinator RPyC server", exc_info=True) - self._server = None - - if self._thread is not None: - self._thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) - if self._thread.is_alive(): - logger.warning("Coordinator RPyC thread did not exit within timeout") - self._thread = None - - @property - def port(self) -> int: - if self._server is None: - return 0 - return int(self._server.port) diff --git a/dimos/core/coordination/rpyc_services.py b/dimos/core/coordination/rpyc_services.py deleted file mode 100644 index da78b059a0..0000000000 --- a/dimos/core/coordination/rpyc_services.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import pickle -from typing import TYPE_CHECKING, Any - -import rpyc - -if TYPE_CHECKING: - from dimos.core.coordination.module_coordinator import ModuleCoordinator - -_CONFIG = { - "allow_all_attrs": True, - "allow_public_attrs": True, - "allow_setattr": True, -} - - -class CoordinatorService(rpyc.Service): # type: ignore[misc] - """RPyC discovery service exposed by the coordinator process. - - Lets an out-of-process `Dimos.connect()` client enumerate deployed - modules and look up the worker RPyC endpoint for each one. - """ - - _coordinator: ModuleCoordinator | None = None - - def on_connect(self, conn: Any) -> None: - conn._config.update(_CONFIG) - - def exposed_list_modules(self) -> list[str]: - assert self._coordinator is not None - return self._coordinator.list_module_names() - - def exposed_get_module_endpoint(self, class_name: str) -> tuple[str, int, int]: - assert self._coordinator is not None - return self._coordinator.get_module_endpoint(class_name) - - def exposed_load_blueprint_by_name(self, name: str) -> None: - # Avoid circular import. - from dimos.robot.get_all_blueprints import get_by_name - - assert self._coordinator is not None - self._coordinator.load_blueprint(get_by_name(name)) - - def exposed_load_blueprint_pickled(self, data: bytes) -> None: - assert self._coordinator is not None - try: - blueprint = pickle.loads(data) - except Exception as e: - raise RuntimeError( - f"Failed to unpickle Blueprint on daemon ({type(e).__name__}: {e}). " - "The blueprint's module classes must be importable on the daemon " - "and all kwargs must be picklable." - ) from e - self._coordinator.load_blueprint(blueprint) - - def exposed_restart_module_by_class_name( - self, class_name: str, reload_source: bool = True - ) -> None: - assert self._coordinator is not None - self._coordinator.restart_module_by_class_name(class_name, reload_source=reload_source) - - -class WorkerRpycService(rpyc.Service): # type: ignore[misc] - _instances: dict[int, Any] = {} - - def on_connect(self, conn: Any) -> None: - conn._config.update(_CONFIG) - - def exposed_get_module(self, module_id: int) -> Any: - return self._instances[module_id] diff --git a/dimos/core/coordination/test_blueprints.py b/dimos/core/coordination/test_blueprints.py index f91d047d91..d8ada76b20 100644 --- a/dimos/core/coordination/test_blueprints.py +++ b/dimos/core/coordination/test_blueprints.py @@ -14,6 +14,7 @@ import pickle +from types import MappingProxyType from typing import Protocol, get_type_hints from pydantic import ValidationError @@ -226,6 +227,24 @@ def test_disabled_module_proxy_pickle_roundtrip() -> None: assert restored.any_method(1, 2, 3) is None +def test_blueprint_pickle_roundtrip() -> None: + blueprint = ( + autoconnect(ModuleA.blueprint(), ModuleB.blueprint()) + .global_config(option1=True, option2=42) + .remappings([(ModuleA, "module_a", ModuleB)]) + ) + + restored = pickle.loads(pickle.dumps(blueprint)) + + assert restored == blueprint + for name in ("transport_map", "global_config_overrides", "remapping_map"): + assert isinstance(getattr(restored, name), MappingProxyType) + assert dict(restored.global_config_overrides) == {"option1": True, "option2": 42} + assert restored.remapping_map[(ModuleA, "module_a")] is ModuleB + with pytest.raises(TypeError): + restored.global_config_overrides["x"] = 1 + + def test_active_blueprints_filters_disabled() -> None: blueprint = autoconnect(ModuleA.blueprint(), ModuleB.blueprint()).disabled_modules(ModuleA) diff --git a/dimos/core/coordination/test_module_coordinator.py b/dimos/core/coordination/test_module_coordinator.py index 9215c16a0d..c1baad17b2 100644 --- a/dimos/core/coordination/test_module_coordinator.py +++ b/dimos/core/coordination/test_module_coordinator.py @@ -25,6 +25,7 @@ DisabledModuleProxy, autoconnect, ) +from dimos.core.coordination.coordinator_rpc import CoordinatorRPC from dimos.core.coordination.module_coordinator import ( ModuleCoordinator, _all_name_types, @@ -775,9 +776,13 @@ def test_restart_preserves_remapped_streams(dynamic_coordinator) -> None: assert source_after.color_image.transport.topic == target.remapped_data.transport.topic -def test_start_rpyc_service(dynamic_coordinator) -> None: - port = dynamic_coordinator.start_rpyc_service() - assert port > 0 +def test_start_rpc_service_responds_to_ping(dynamic_coordinator) -> None: + dynamic_coordinator.start_rpc_service() + client = CoordinatorRPC.connect(timeout=2.0) + try: + assert client.call("ping") == "pong" + finally: + client.stop() def test_list_module_names(dynamic_coordinator) -> None: @@ -785,16 +790,3 @@ def test_list_module_names(dynamic_coordinator) -> None: dynamic_coordinator.load_module(ModuleA) dynamic_coordinator.load_module(ModuleC) assert set(dynamic_coordinator.list_module_names()) == {"ModuleA", "ModuleC"} - - -def test_get_module_endpoint(dynamic_coordinator) -> None: - dynamic_coordinator.load_module(ModuleA) - host, port, module_id = dynamic_coordinator.get_module_endpoint("ModuleA") - assert host == "localhost" - assert port > 0 - assert isinstance(module_id, int) - - -def test_get_module_endpoint_unknown_raises(dynamic_coordinator) -> None: - with pytest.raises(KeyError): - dynamic_coordinator.get_module_endpoint("NoSuchModule") diff --git a/dimos/core/coordination/worker_messages.py b/dimos/core/coordination/worker_messages.py index ecd145668c..8fdb1cd0f2 100644 --- a/dimos/core/coordination/worker_messages.py +++ b/dimos/core/coordination/worker_messages.py @@ -57,11 +57,6 @@ class SuppressConsoleRequest: pass -@dataclass(frozen=True) -class StartRpycRequest: - pass - - @dataclass(frozen=True) class ShutdownRequest: pass @@ -74,7 +69,6 @@ class ShutdownRequest: | CallMethodRequest | UndeployModuleRequest | SuppressConsoleRequest - | StartRpycRequest | ShutdownRequest ) diff --git a/dimos/core/module.py b/dimos/core/module.py index 259118098f..84a68c1c59 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -71,6 +71,18 @@ class SkillInfo: args_schema: str +class PeekNotFound: + """Sentinel returned by `Module.peek_stream` when the named stream is + not present on a module. A class instance survives pickle round-trips so + `Dimos.peek_stream` can `isinstance(result, PeekNotFound)`-test the reply. + """ + + __slots__ = () + + def __repr__(self) -> str: + return "" + + def get_loop() -> tuple[asyncio.AbstractEventLoop, threading.Thread | None]: try: running_loop = asyncio.get_running_loop() @@ -767,6 +779,21 @@ def set_transport(self, stream_name: str, transport: Transport) -> bool: # type stream._transport = transport return True + @rpc + def peek_stream(self, stream_name: str, timeout: float) -> Any: + """Return the next emission on a named stream, a `PeekNotFound` + sentinel if no such stream exists, or `None` on timeout/error. + + Used by `Dimos.peek_stream` to scan running modules. + """ + stream = self.outputs.get(stream_name) or self.inputs.get(stream_name) + if stream is None: + return PeekNotFound() + try: + return stream.get_next(timeout) + except Exception: + return None + # called from remote def connect_stream(self, input_name: str, remote_stream: RemoteOut[T]): # type: ignore[no-untyped-def] input_stream = getattr(self, input_name, None) diff --git a/dimos/core/rpc_client.py b/dimos/core/rpc_client.py index 42407f315e..56dcb4e200 100644 --- a/dimos/core/rpc_client.py +++ b/dimos/core/rpc_client.py @@ -12,16 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import asyncio from collections.abc import Callable from typing import TYPE_CHECKING, Any, Protocol -from dimos.core.coordination.python_worker import MethodCallProxy +from dimos.core.coordination.python_worker import Actor, MethodCallProxy from dimos.core.stream import RemoteStream from dimos.protocol.rpc.pubsubrpc import LCMRPC from dimos.protocol.rpc.spec import RPCSpec from dimos.utils.logging_config import setup_logger +if TYPE_CHECKING: + from dimos.core.module import ModuleBase + logger = setup_logger() @@ -95,14 +100,30 @@ def set_transport(self, stream_name: str, transport: Any) -> bool: ... class RPCClient: - def __init__(self, actor_instance, actor_class) -> None: # type: ignore[no-untyped-def] - self.rpc = LCMRPC() + def __init__( + self, + actor_instance: Actor | None, + actor_class: type[ModuleBase], + *, + rpc: LCMRPC | None = None, + ) -> None: + if rpc is None: + self.rpc = LCMRPC() + self._owns_rpc = True + self.rpc.start() + else: + self.rpc = rpc + self._owns_rpc = False self.actor_class = actor_class self.remote_name = actor_class.__name__ self.actor_instance = actor_instance self.rpcs = actor_class.rpcs.keys() - self.rpc.start() - self._unsub_fns = [] # type: ignore[var-annotated] + self._unsub_fns: list = [] # type: ignore[type-arg] + + @classmethod + def remote(cls, actor_class: type[ModuleBase], *, rpc: LCMRPC | None = None) -> RPCClient: + """Build an RPCClient with no parent-side Actor (cross-process clients).""" + return cls(None, actor_class, rpc=rpc) def stop_rpc_client(self) -> None: for unsub in self._unsub_fns: @@ -113,7 +134,7 @@ def stop_rpc_client(self) -> None: self._unsub_fns = [] - if self.rpc: + if self.rpc and self._owns_rpc: self.rpc.stop() self.rpc = None # type: ignore[assignment] @@ -150,6 +171,13 @@ def __getattr__(self, name: str): # type: ignore[no-untyped-def] self.stop_rpc_client, ) + if self.actor_instance is None: + raise AttributeError( + f"{self.remote_name!r} has no @rpc method named {name!r}; " + f"this client was constructed without a parent-side Actor " + f"(remote-mode), so non-@rpc attribute access is unavailable." + ) + # return super().__getattr__(name) # Try to avoid recursion by directly accessing attributes that are known result = self.actor_instance.__getattr__(name) diff --git a/dimos/core/run_registry.py b/dimos/core/run_registry.py index 919412a27b..dbe3df273e 100644 --- a/dimos/core/run_registry.py +++ b/dimos/core/run_registry.py @@ -16,7 +16,7 @@ from __future__ import annotations -from dataclasses import asdict, dataclass, field +from dataclasses import asdict, dataclass, field, fields import json import os from pathlib import Path @@ -45,7 +45,6 @@ class RunEntry: cli_args: list[str] = field(default_factory=list) config_overrides: dict[str, object] = field(default_factory=dict) grpc_port: int = 9877 - rpyc_port: int = 0 original_argv: list[str] = field(default_factory=list) @property @@ -63,9 +62,12 @@ def remove(self) -> None: @classmethod def load(cls, path: Path) -> RunEntry: - """Load a RunEntry from a JSON file.""" + """Load a RunEntry from a JSON file. Unknown keys are dropped so + registry files written by older dimos versions still load. + """ data = json.loads(path.read_text()) - return cls(**data) + valid = {f.name for f in fields(cls)} + return cls(**{k: v for k, v in data.items() if k in valid}) def generate_run_id(blueprint: str) -> str: @@ -189,25 +191,3 @@ def stop_entry(entry: RunEntry, force: bool = False) -> tuple[str, bool]: entry.remove() return (f"Stopped with {sig_name}", True) - - -def get_most_recent_rpyc_port(run_id: str | None = None) -> int: - entry: RunEntry - - if run_id is not None: - entries = [e for e in list_runs(alive_only=True) if e.run_id == run_id] - if not entries: - raise RuntimeError(f"No running DimOS instance with run_id={run_id!r}") - entry = entries[0] - else: - most_recent = get_most_recent(alive_only=True) - if most_recent is None: - raise RuntimeError("No running DimOS instance. Start one with `dimos run `.") - entry = most_recent - - if not entry.rpyc_port: - raise RuntimeError( - f"Run {entry.run_id} has no rpyc_port. Was it started with an older version of dimos?" - ) - - return entry.rpyc_port diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index 3d4ade88d4..22cbca31b3 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -77,7 +77,7 @@ def test_classmethods() -> None: # Check that we have the expected RPC methods assert "navigate_to" in class_rpcs, "navigate_to should be in rpcs" assert "start" in class_rpcs, "start should be in rpcs" - assert len(class_rpcs) == 7 + assert len(class_rpcs) == 8 # Check that the values are callable assert callable(class_rpcs["navigate_to"]), "navigate_to should be callable" diff --git a/dimos/core/test_run_registry.py b/dimos/core/test_run_registry.py deleted file mode 100644 index 06b8ecfdd9..0000000000 --- a/dimos/core/test_run_registry.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from dimos.core.run_registry import RunEntry, get_most_recent_rpyc_port - - -@pytest.fixture(autouse=True) -def _use_tmp_registry(tmp_path, monkeypatch): - """Redirect the registry directory to a temp dir for every test.""" - monkeypatch.setattr("dimos.core.run_registry.REGISTRY_DIR", tmp_path) - - -@pytest.fixture(autouse=True) -def _all_pids_alive(mocker): - """Treat every PID as alive by default; individual tests can override.""" - mocker.patch("dimos.core.run_registry.is_pid_alive", return_value=True) - - -@pytest.fixture() -def make_entry(tmp_path): - """Factory fixture: create and persist a RunEntry, return it.""" - - def _make(run_id="run-1", rpyc_port=18812, **kwargs): - defaults = dict( - pid=9999, - blueprint="bp", - started_at="2026-01-01T00:00:00", - log_dir=str(tmp_path / "logs"), - ) - defaults.update(kwargs) - entry = RunEntry(run_id=run_id, rpyc_port=rpyc_port, **defaults) - entry.save() - return entry - - return _make - - -def test_returns_port_of_most_recent_run(make_entry): - make_entry(run_id="20260101-000000-old", rpyc_port=10001) - make_entry(run_id="20260102-000000-new", rpyc_port=10002) - - assert get_most_recent_rpyc_port() == 10002 - - -def test_returns_port_for_specific_run_id(make_entry): - make_entry(run_id="target", rpyc_port=10001) - make_entry(run_id="other", rpyc_port=10002) - - assert get_most_recent_rpyc_port(run_id="target") == 10001 - - -def test_raises_when_no_running_instances(): - with pytest.raises(RuntimeError, match="No running DimOS instance"): - get_most_recent_rpyc_port() - - -def test_raises_when_run_id_not_found(make_entry): - make_entry(run_id="exists") - - with pytest.raises(RuntimeError, match="No running DimOS instance with run_id"): - get_most_recent_rpyc_port(run_id="nope") - - -def test_raises_when_rpyc_port_is_zero(make_entry): - make_entry(rpyc_port=0) - - with pytest.raises(RuntimeError, match="no rpyc_port"): - get_most_recent_rpyc_port() - - -def test_skips_dead_processes(make_entry, mocker): - alive = make_entry(run_id="alive", rpyc_port=10001, pid=1) - make_entry(run_id="dead", rpyc_port=10002, pid=2) - - mocker.patch( - "dimos.core.run_registry.is_pid_alive", - side_effect=lambda pid: pid == alive.pid, - ) - - assert get_most_recent_rpyc_port() == 10001 diff --git a/dimos/porcelain/conftest.py b/dimos/porcelain/conftest.py index 5060de240a..349dd0beaa 100644 --- a/dimos/porcelain/conftest.py +++ b/dimos/porcelain/conftest.py @@ -50,9 +50,9 @@ def running_app() -> Iterator[Dimos]: @pytest.fixture def client(running_app: Dimos) -> Iterator[Dimos]: - """Rpyc client paired with the per-test `running_app`.""" - port = running_app._coordinator.start_rpyc_service() - instance = Dimos.connect(host="localhost", port=port) + """LCM @rpc client paired with the per-test `running_app`.""" + running_app._coordinator.start_rpc_service() + instance = Dimos.connect_in_process() try: yield instance finally: diff --git a/dimos/porcelain/dimos.py b/dimos/porcelain/dimos.py index 39c75090b9..bb15a6860f 100644 --- a/dimos/porcelain/dimos.py +++ b/dimos/porcelain/dimos.py @@ -22,8 +22,8 @@ from dimos.core.coordination.blueprints import Blueprint from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.global_config import global_config -from dimos.core.module import ModuleBase -from dimos.core.run_registry import get_most_recent_rpyc_port +from dimos.core.module import ModuleBase, PeekNotFound +from dimos.core.run_registry import get_most_recent, list_runs from dimos.porcelain.local_module_source import LocalModuleSource from dimos.porcelain.module_source import ModuleSource from dimos.porcelain.remote_module_source import RemoteModuleSource @@ -99,30 +99,41 @@ def restart(self, module_class: type[ModuleBase], *, reload_source: bool = True) self._coordinator.restart_module(module_class, reload_source=reload_source) @classmethod - def connect( - cls, - *, - run_id: str | None = None, - host: str | None = None, - port: int | None = None, - ) -> Dimos: - """Connect to an already-running DimOS instance. + def connect(cls, *, run_id: str | None = None, timeout: float = 5.0) -> Dimos: + """Connect to an already-running DimOS instance over LCM. With no arguments, finds the most recent alive `RunEntry` in the - registry and connects to its coordinator RPyC endpoint. Use `run_id=` to - select a specific run, or `host=` + `port=` to bypass the registry. + registry and connects via LCM to its `Coordinator` @rpc service. Use + `run_id=` to select a specific run. Returns a `Dimos` instance in read/call mode: `skills`, attribute - access, `__repr__` and `__dir__` work, but `run()` and `restart()` raise - `NotImplementedError`. `stop()` closes the connection without - terminating the remote process. + access, `__repr__` and `__dir__` work, but only methods marked with + `@rpc` (and `@skill`, which implies `@rpc`) on a module are callable. + `stop()` closes the connection without terminating the remote process. """ - if host is not None and port is not None: - source: ModuleSource = RemoteModuleSource(host, port) + if run_id is not None: + entries = [e for e in list_runs(alive_only=True) if e.run_id == run_id] + if not entries: + raise RuntimeError(f"No running DimOS instance with run_id={run_id!r}") else: - rpyc_port = get_most_recent_rpyc_port(run_id=run_id) - source = RemoteModuleSource("localhost", rpyc_port) + if get_most_recent(alive_only=True) is None: + raise RuntimeError( + "No running DimOS instance. Start one with `dimos run `." + ) + + source = RemoteModuleSource(timeout=timeout) + instance = cls() + instance._source = source + return instance + + @classmethod + def connect_in_process(cls, *, timeout: float = 5.0) -> Dimos: + """Connect over LCM without consulting the run registry. + For tests where the coordinator and the client live in the same + process and there is no `RunEntry` on disk. + """ + source = RemoteModuleSource(timeout=timeout) instance = cls() instance._source = source return instance @@ -149,46 +160,38 @@ def peek_stream(self, name: str, timeout: float = 1.0) -> Any: Args: name: Stream attribute name (e.g. "color_image"). - timeout: Max seconds to wait. Capped internally at 25s to stay - under the rpyc sync request timeout. + timeout: Max seconds to wait. """ - effective_timeout = min(timeout, 25.0) with self._lock: source = self._source if source is None: raise RuntimeError("No modules are running") - stream = None - for module_name in source.list_module_names(): + module_names = source.list_module_names() + for module_name in module_names: try: - module = source.get_rpyc_module(module_name) - if name in module.outputs: - stream = module.outputs[name] - break - if name in module.inputs: - stream = module.inputs[name] - break + module = source.get_module(module_name) + peek = getattr(module, "peek_stream", None) + if peek is None: + continue + result = peek(name, timeout) except Exception: continue + if isinstance(result, PeekNotFound): + continue + return result - if stream is None: - raise LookupError( - f"No running module exposes a stream named {name!r}. " - f"Running modules: {source.list_module_names()}" - ) - - try: - return stream.get_next(effective_timeout) - except Exception: - return None + raise LookupError( + f"No running module exposes a stream named {name!r}. Running modules: {module_names}" + ) def stop(self) -> None: """Stop all modules and clean up resources. On a locally-driven `Dimos`, stops the coordinator and workers. - On a connected `Dimos` (from `Dimos.connect()`), closes RPyC - connections without terminating the remote process. + On a connected `Dimos` (from `Dimos.connect()`), closes the LCM RPC + client without terminating the remote process. """ with self._lock: if self._stopped: @@ -221,7 +224,7 @@ def __getattr__(self, name: str) -> Any: raise RuntimeError("No modules are running") try: - return source.get_rpyc_module(name) + return source.get_module(name) except KeyError: pass @@ -257,10 +260,10 @@ def _run_remote(target: str | Blueprint | type[ModuleBase], source: RemoteModule if key in all_modules: source.load_blueprint_by_name(key) else: - source.load_blueprint_pickled(target.blueprint()) + source.load_blueprint(target.blueprint()) return if isinstance(target, Blueprint): - source.load_blueprint_pickled(target) + source.load_blueprint(target) return raise TypeError( f"run() expects a blueprint name (str), Blueprint, or Module class, " diff --git a/dimos/porcelain/local_module_source.py b/dimos/porcelain/local_module_source.py index 81a4a23d57..8dea71154f 100644 --- a/dimos/porcelain/local_module_source.py +++ b/dimos/porcelain/local_module_source.py @@ -14,11 +14,8 @@ from __future__ import annotations -import threading from typing import TYPE_CHECKING, Any -import rpyc - from dimos.porcelain.module_source import ModuleSource from dimos.utils.logging_config import setup_logger @@ -31,49 +28,27 @@ class LocalModuleSource(ModuleSource): """Module source backed by an in-process `ModuleCoordinator`. - Uses per-worker RPyC servers for both attribute access and skill calls, - so the wire path is identical to `RemoteModuleSource`. + Returns the per-module `RPCClient` proxies the coordinator already + maintains for inter-module calls. Method calls flow over the same LCM + bus the modules use to talk to each other. """ is_remote = False def __init__(self, coordinator: ModuleCoordinator) -> None: self._coordinator = coordinator - self._cache: dict[str, tuple[rpyc.Connection, Any]] = {} - self._lock = threading.RLock() def list_module_names(self) -> list[str]: return self._coordinator.list_module_names() - def get_rpyc_module(self, name: str) -> Any: - with self._lock: - cached = self._cache.get(name) - if cached is not None and not cached[0].closed: - return cached[1] - - host, port, module_id = self._coordinator.get_module_endpoint(name) - - conn = rpyc.connect( - host, port, config={"sync_request_timeout": 30, "allow_pickle": True} - ) - module = conn.root.get_module(module_id) - self._cache[name] = (conn, module) - return module + def get_module(self, name: str) -> Any: + for cls, proxy in self._coordinator._deployed_modules.items(): + if cls.__name__ == name: + return proxy + raise KeyError(name) def invalidate(self, name: str) -> None: - with self._lock: - entry = self._cache.pop(name, None) - if entry is not None: - try: - entry[0].close() - except Exception: - logger.warning("Failed to close RPyC connection for module %s", name, exc_info=True) + return None def close(self) -> None: - with self._lock: - for conn, _ in self._cache.values(): - try: - conn.close() - except Exception: - logger.warning("Failed to close RPyC connection during shutdown", exc_info=True) - self._cache.clear() + return None diff --git a/dimos/porcelain/module_source.py b/dimos/porcelain/module_source.py index c6559bddcc..504b4808d0 100644 --- a/dimos/porcelain/module_source.py +++ b/dimos/porcelain/module_source.py @@ -24,6 +24,6 @@ class ModuleSource(Protocol): def list_module_names(self) -> list[str]: ... - def get_rpyc_module(self, name: str) -> Any: ... + def get_module(self, name: str) -> Any: ... def close(self) -> None: ... diff --git a/dimos/porcelain/remote_module_source.py b/dimos/porcelain/remote_module_source.py index 4edfe9dffc..b8a5174e62 100644 --- a/dimos/porcelain/remote_module_source.py +++ b/dimos/porcelain/remote_module_source.py @@ -14,114 +14,139 @@ from __future__ import annotations -import copyreg -import pickle +import importlib import threading -import time -from types import MappingProxyType -from typing import TYPE_CHECKING, Any - -import rpyc +from typing import TYPE_CHECKING, Any, NamedTuple +from dimos.core.coordination.coordinator_rpc import CoordinatorRPC +from dimos.core.rpc_client import RpcCall, RPCClient from dimos.porcelain.module_source import ModuleSource from dimos.utils.logging_config import setup_logger -_CONNECT_RETRY_DEADLINE_S = 2.0 - if TYPE_CHECKING: from dimos.core.coordination.blueprints import Blueprint + from dimos.protocol.rpc.pubsubrpc import LCMRPC logger = setup_logger() -# `Blueprint` carries `mappingproxy` fields (e.g. `global_config_overrides`, -# `remapping_map`) that are not picklable by default — `MappingProxyType` lives -# at `builtins.mappingproxy` which isn't a real builtins attribute, so pickle -# can't reference the constructor by name. Round-trip through a module-level -# factory so `load_blueprint_pickled` can ship blueprints over RPyC. -def _rebuild_mappingproxy(d: dict) -> MappingProxyType: # type: ignore[type-arg] - return MappingProxyType(d) +class _RemoteProxy: + """Names-only proxy for a remote module whose class can't be imported. + + Exposes only the @rpc methods the remote daemon advertised; any other + attribute access raises `AttributeError`. + """ + + def __init__(self, rpc: LCMRPC, remote_name: str, rpc_names: set[str]) -> None: + self._rpc = rpc + self._remote_name = remote_name + self._rpc_names = rpc_names + self._unsub_fns: list = [] # type: ignore[type-arg] + def __getattr__(self, name: str) -> Any: + if name.startswith("_"): + raise AttributeError(name) + if name not in self._rpc_names: + raise AttributeError(f"{self._remote_name!r} has no @rpc method named {name!r}") + return RpcCall(None, self._rpc, name, self._remote_name, self._unsub_fns, None) -def _reduce_mappingproxy(m: MappingProxyType) -> tuple[Any, ...]: # type: ignore[type-arg] - return (_rebuild_mappingproxy, (dict(m),)) + def __dir__(self) -> list[str]: + return sorted(self._rpc_names) -copyreg.pickle(MappingProxyType, _reduce_mappingproxy) # type: ignore[arg-type] +class ModuleDescriptor(NamedTuple): + """Returned by `Coordinator/list_modules` so a remote client can build a proxy.""" + + class_name: str + qualified_path: str + rpc_names: list[str] class RemoteModuleSource(ModuleSource): - """Module source backed by a remote `CoordinatorService` RPyC endpoint.""" + """Module source that drives a remote daemon over the Coordinator @rpc service.""" is_remote = True - def __init__(self, host: str, port: int) -> None: - self._coord_conn = _rpyc_connect( - host, port, config={"sync_request_timeout": 30, "allow_pickle": True} - ) - self._cache: dict[str, tuple[rpyc.Connection, Any]] = {} + def __init__(self, *, timeout: float = 5.0) -> None: + self._timeout = timeout + self._cache: dict[str, RPCClient | _RemoteProxy] = {} + self._descriptors: dict[str, ModuleDescriptor] | None = None self._lock = threading.RLock() + try: + self._coord = CoordinatorRPC.connect(timeout=timeout) + except TimeoutError: + raise RuntimeError( + "No running DimOS instance. Start one with `dimos run `." + ) from None + + def _refresh_descriptors(self) -> dict[str, ModuleDescriptor]: + descriptors = self._coord.call("list_modules") + self._descriptors = {d.class_name: d for d in descriptors} + return self._descriptors + + def _get_descriptor(self, name: str) -> ModuleDescriptor: + with self._lock: + if self._descriptors is None or name not in self._descriptors: + self._refresh_descriptors() + assert self._descriptors is not None + if name not in self._descriptors: + raise KeyError(name) + return self._descriptors[name] + def list_module_names(self) -> list[str]: - return list(self._coord_conn.root.list_modules()) + with self._lock: + descriptors = self._refresh_descriptors() + return list(descriptors.keys()) - def get_rpyc_module(self, name: str) -> Any: + def get_module(self, name: str) -> Any: with self._lock: cached = self._cache.get(name) - if cached is not None and not cached[0].closed: - return cached[1] - - endpoint = self._coord_conn.root.get_module_endpoint(name) - host, port, module_id = endpoint[0], int(endpoint[1]), int(endpoint[2]) - conn = _rpyc_connect( - host, port, config={"sync_request_timeout": 30, "allow_pickle": True} - ) - module = conn.root.get_module(module_id) - self._cache[name] = (conn, module) - return module + if cached is not None: + return cached + + descriptor = self._get_descriptor(name) + proxy: RPCClient | _RemoteProxy + try: + module_path, class_name = descriptor.qualified_path.rsplit(".", 1) + cls = getattr(importlib.import_module(module_path), class_name) + proxy = RPCClient(None, cls, rpc=self._coord.rpc) + except (ImportError, AttributeError): + proxy = _RemoteProxy(self._coord.rpc, name, set(descriptor.rpc_names)) + self._cache[name] = proxy + return proxy def invalidate(self, name: str) -> None: with self._lock: entry = self._cache.pop(name, None) - if entry is not None: + self._descriptors = None + if isinstance(entry, RPCClient): try: - entry[0].close() + entry.stop_rpc_client() except Exception: - logger.warning("Failed to close RPyC connection for module %s", name, exc_info=True) + logger.warning("Failed to release proxy for %s", name, exc_info=True) def load_blueprint_by_name(self, name: str) -> None: - self._coord_conn.root.load_blueprint_by_name(name) + self._coord.call("load_blueprint_by_name", name) - def load_blueprint_pickled(self, blueprint: Blueprint) -> None: - data = pickle.dumps(blueprint) - self._coord_conn.root.load_blueprint_pickled(data) + def load_blueprint(self, blueprint: Blueprint) -> None: + self._coord.call("load_blueprint", blueprint) def restart_module_by_class_name(self, class_name: str, *, reload_source: bool) -> None: - self._coord_conn.root.restart_module_by_class_name(class_name, reload_source) + self._coord.call("restart_module_by_class_name", class_name, reload_source=reload_source) self.invalidate(class_name) def close(self) -> None: with self._lock: - for conn, _ in self._cache.values(): - try: - conn.close() - except Exception: - pass + for entry in self._cache.values(): + if isinstance(entry, RPCClient): + try: + entry.stop_rpc_client() + except Exception: + pass self._cache.clear() + self._descriptors = None try: - self._coord_conn.close() + self._coord.stop() except Exception: pass - - -def _rpyc_connect(host: str, port: int, **kwargs: Any) -> rpyc.Connection: - deadline = time.monotonic() + _CONNECT_RETRY_DEADLINE_S - delay = 0.010 - while True: - try: - return rpyc.connect(host, port, **kwargs) - except ConnectionRefusedError: - if time.monotonic() >= deadline: - raise - time.sleep(delay) - delay = min(delay * 2, 0.200) diff --git a/dimos/porcelain/skills_proxy.py b/dimos/porcelain/skills_proxy.py index 3332a888dd..86e9d267b7 100644 --- a/dimos/porcelain/skills_proxy.py +++ b/dimos/porcelain/skills_proxy.py @@ -63,7 +63,7 @@ def _build_cache(self) -> None: errors: dict[str, BaseException] = {} for name in names: try: - module_proxy = self._source.get_rpyc_module(name) + module_proxy = self._source.get_module(name) skills = list(module_proxy.get_skills()) except Exception as e: logger.warning("Failed to enumerate skills for module %s", name, exc_info=True) diff --git a/dimos/porcelain/test_dimos.py b/dimos/porcelain/test_dimos.py index 83188b14f6..b9ab867195 100644 --- a/dimos/porcelain/test_dimos.py +++ b/dimos/porcelain/test_dimos.py @@ -159,10 +159,9 @@ def test_run_blueprint_object(app): assert app.is_running -def test_rpyc_module_access(running_app): +def test_module_rpc_call(running_app): module = running_app.StressTestModule - # Access an attribute from ModuleBase - assert module._module_closed is False + assert module.ping() == "pong" def test_dir_lists_modules(running_app): diff --git a/dimos/porcelain/test_local_module_source.py b/dimos/porcelain/test_local_module_source.py index e9a43fb4e9..1b10f2a460 100644 --- a/dimos/porcelain/test_local_module_source.py +++ b/dimos/porcelain/test_local_module_source.py @@ -14,6 +14,8 @@ from __future__ import annotations +import pytest + from dimos.porcelain.local_module_source import LocalModuleSource @@ -27,35 +29,24 @@ def test_list_module_names(running_app): assert "StressTestModule" in names -def test_get_rpyc_module(running_app): - module = running_app._source.get_rpyc_module("StressTestModule") - assert module._module_closed is False +def test_get_module_returns_callable_proxy(running_app): + module = running_app._source.get_module("StressTestModule") + assert module.ping() == "pong" -def test_get_rpyc_module_caches_connection(running_app): +def test_get_module_returns_same_proxy(running_app): source = running_app._source - m1 = source.get_rpyc_module("StressTestModule") - m2 = source.get_rpyc_module("StressTestModule") + m1 = source.get_module("StressTestModule") + m2 = source.get_module("StressTestModule") assert m1 is m2 -def test_invalidate_allows_reconnect(running_app): - source = running_app._source - m1 = source.get_rpyc_module("StressTestModule") - source.invalidate("StressTestModule") - m2 = source.get_rpyc_module("StressTestModule") - assert m1 is not m2 - assert m2._module_closed is False +def test_get_module_unknown_raises(running_app): + with pytest.raises(KeyError): + running_app._source.get_module("NonexistentModule") -def test_invalidate_unknown_name_is_noop(running_app): +def test_invalidate_is_noop(running_app): + # Coordinator owns the proxy; the source has no per-call cache. + running_app._source.invalidate("StressTestModule") running_app._source.invalidate("NonexistentModule") - - -def test_close_then_get_reconnects(running_app): - source = running_app._source - m1 = source.get_rpyc_module("StressTestModule") - source.close() - m2 = source.get_rpyc_module("StressTestModule") - assert m1 is not m2 - assert m2._module_closed is False diff --git a/dimos/porcelain/test_remote_module_source.py b/dimos/porcelain/test_remote_module_source.py index 47bb883e91..79cd4f51af 100644 --- a/dimos/porcelain/test_remote_module_source.py +++ b/dimos/porcelain/test_remote_module_source.py @@ -14,10 +14,13 @@ from __future__ import annotations +import importlib + import pytest from dimos.core.tests.stress_test_module import StressTestModule from dimos.porcelain.dimos import Dimos +from dimos.porcelain.remote_module_source import _RemoteProxy def test_connect_no_running_system(tmp_path, monkeypatch): @@ -25,10 +28,10 @@ def test_connect_no_running_system(tmp_path, monkeypatch): monkeypatch.setattr(run_registry, "REGISTRY_DIR", tmp_path / "runs") with pytest.raises(RuntimeError, match="No running DimOS instance"): - Dimos.connect() + Dimos.connect(timeout=0.5) -def test_connect_via_host_port_skill_call(running_app, client): +def test_connect_skill_call(running_app, client): assert client.skills.ping() == "pong" assert client.skills.echo(message="hello") == "hello" client.stop() @@ -36,16 +39,16 @@ def test_connect_via_host_port_skill_call(running_app, client): assert running_app.skills.ping() == "pong" -def test_connect_attribute_access(client): +def test_connect_rpc_method_call(client): module = client.StressTestModule - assert module._module_closed is False + assert module.ping() == "pong" def test_connect_restart_invalidates_cache(client): source = client._source - m_before = source.get_rpyc_module("StressTestModule") + m_before = source.get_module("StressTestModule") client.restart(StressTestModule, reload_source=False) - m_after = source.get_rpyc_module("StressTestModule") + m_after = source.get_module("StressTestModule") assert m_before is not m_after assert client.skills.ping() == "pong" @@ -74,8 +77,24 @@ def test_connect_list_module_names(client): assert "StressTestModule" in names -def test_connect_get_rpyc_module_caches(client): +def test_connect_get_module_caches(client): source = client._source - m1 = source.get_rpyc_module("StressTestModule") - m2 = source.get_rpyc_module("StressTestModule") + m1 = source.get_module("StressTestModule") + m2 = source.get_module("StressTestModule") assert m1 is m2 + + +def test_remote_proxy_fallback_when_class_unimportable(client, monkeypatch): + """If `importlib.import_module` raises, get_module returns a names-only proxy.""" + real_import = importlib.import_module + + def fake_import(name: str, *args, **kwargs): # type: ignore[no-untyped-def] + if "stress_test_module" in name: + raise ImportError(name) + return real_import(name, *args, **kwargs) + + monkeypatch.setattr("dimos.porcelain.remote_module_source.importlib.import_module", fake_import) + client._source.invalidate("StressTestModule") + proxy = client._source.get_module("StressTestModule") + assert isinstance(proxy, _RemoteProxy) + assert proxy.ping() == "pong" diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 2e24cdb664..738677b29f 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -316,7 +316,7 @@ def run( daemonize(log_dir) - rpyc_port = coordinator.start_rpyc_service() # After daemonize(). + coordinator.start_rpc_service() # After daemonize(). entry = RunEntry( run_id=run_id, pid=os.getpid(), @@ -325,7 +325,6 @@ def run( log_dir=str(log_dir), cli_args=list(robot_types), config_overrides=cli_config_overrides, - rpyc_port=rpyc_port, original_argv=sys.argv, ) entry.save() @@ -333,7 +332,7 @@ def run( install_signal_handlers(entry, coordinator) coordinator.loop() else: - rpyc_port = coordinator.start_rpyc_service() + coordinator.start_rpc_service() entry = RunEntry( run_id=run_id, pid=os.getpid(), @@ -342,7 +341,6 @@ def run( log_dir=str(log_dir), cli_args=list(robot_types), config_overrides=cli_config_overrides, - rpyc_port=rpyc_port, original_argv=sys.argv, ) entry.save() diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 4083def93b..a5bdff890a 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -114,12 +114,12 @@ def _camera_info_static() -> CameraInfo: def make_connection(ip: str | None, cfg: GlobalConfig) -> Go2ConnectionProtocol: - connection_type = cfg.unitree_connection_type + connection_type = cfg.unitree_connection_type.lower() if ip in ("fake", "mock", "replay") or connection_type == "replay": dataset = cfg.replay_db return ReplayConnection(dataset=dataset) - elif ip == "mujoco" or connection_type == "mujoco": + elif ip == "mujoco" or connection_type in ("mujoco", "true"): from dimos.robot.unitree.mujoco_connection import MujocoConnection return MujocoConnection(cfg) diff --git a/docs/usage/python-api.md b/docs/usage/python-api.md index 8eb317e623..6e139a3bc4 100644 --- a/docs/usage/python-api.md +++ b/docs/usage/python-api.md @@ -26,9 +26,6 @@ print(app.skills) # Access a module directly. app.ReplanningAStarPlanner -# Access a private variable. -print(app.ReplanningAStarPlanner._planner._safe_goal_clearance) - # Add another module dynamically. from dimos.robot.unitree.keyboard_teleop import KeyboardTeleop app.run(KeyboardTeleop) @@ -51,15 +48,11 @@ subscriber: img = app.peek_stream("color_image", 1.0) # Display it in a window. -import cv2, numpy as np -cv2.imshow("color_image", np.array(img.data)) +import cv2 +cv2.imshow("color_image", img.data) cv2.waitKey(0) ``` -Note, `np.array` is used to turn it into a real numpy array. `img.data` is a -proxy object. That works in most cases, but `cv2.imshow` checks the actual -class, so it needs a real numpy array. - ## Remote mode Start a daemon first (via CLI or another script), then connect to it: @@ -85,11 +78,11 @@ Connect to a specific instance: ```python skip # By run ID (from `dimos status`) app = Dimos.connect(run_id="20260306-143022-unitree-go2") - -# By host and port -app = Dimos.connect(host="192.168.1.50", port=18861) ``` +`Dimos.connect()` discovers the daemon over the local LCM bus. Cross-host +connection is set with the standard `LCM_DEFAULT_URL` environment variable. + `run()` and `restart()` also work against a daemon: ```python skip @@ -107,7 +100,7 @@ be picklable. ## Limitations -- `stop()` on a connected instance closes the RPyC connection but does not terminate the remote process. Use `dimos stop` for that. +- `stop()` on a connected instance closes the LCM connection but does not terminate the remote process. Use `dimos stop` for that. ## Restarting modules diff --git a/pyproject.toml b/pyproject.toml index 93c3d32d34..fc1fad850e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,7 +126,6 @@ dependencies = [ "psutil>=7.0.0", "sqlite-vec>=0.1.6", "lz4>=4.4.5", - "rpyc>=6.0.0", "bleak>=3.0.2", "cryptography>=46.0.5", ] @@ -531,8 +530,6 @@ module = [ "plum.*", "portal", "psutil", - "rpyc", - "rpyc.*", "pycuda", "pycuda.*", "pydrake", @@ -540,6 +537,7 @@ module = [ "pyzed", "pyzed.*", "rclpy.*", + "reportlab.*", "sam2.*", "scipy", "scipy.*", diff --git a/uv.lock b/uv.lock index 39aa2b2984..aa01220788 100644 --- a/uv.lock +++ b/uv.lock @@ -1858,7 +1858,6 @@ dependencies = [ { name = "pyturbojpeg" }, { name = "reactivex" }, { name = "rerun-sdk" }, - { name = "rpyc" }, { name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "scipy", version = "1.17.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "sortedcontainers" }, @@ -2443,7 +2442,6 @@ requires-dist = [ { name = "rerun-sdk", specifier = ">=0.20.0" }, { name = "rerun-sdk", marker = "extra == 'docker'" }, { name = "rerun-sdk", marker = "extra == 'visualization'", specifier = ">=0.20.0" }, - { name = "rpyc", specifier = ">=6.0.0" }, { name = "scikit-learn", marker = "extra == 'misc'" }, { name = "scipy", specifier = ">=1.15.1" }, { name = "scipy", marker = "extra == 'docker'", specifier = ">=1.15.1" }, @@ -7521,18 +7519,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2b/31/21609a9be48e877bc33b089a7f495c853215def5aeb9564a31c210d9d769/plum_dispatch-2.5.7-py3-none-any.whl", hash = "sha256:06471782eea0b3798c1e79dca2af2165bafcfa5eb595540b514ddd81053b1ede", size = 42612, upload-time = "2025-01-17T20:07:26.461Z" }, ] -[[package]] -name = "plumbum" -version = "1.10.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "pywin32", marker = "platform_python_implementation != 'PyPy' and sys_platform == 'win32'" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/dc/c8/11a5f792704b70f071a3dbc329105a98e9cc8d25daaf09f733c44eb0ef8e/plumbum-1.10.0.tar.gz", hash = "sha256:f8cbf0ecec0b73ff4e349398b65112a9e3f9300e7dc019001217dcc148d5c97c", size = 320039, upload-time = "2025-10-31T05:02:48.697Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/79/ad/45312df6b63ba64ea35b8d8f5f0c577aac16e6b416eafe8e1cb34e03f9a7/plumbum-1.10.0-py3-none-any.whl", hash = "sha256:9583d737ac901c474d99d030e4d5eec4c4e6d2d7417b1cf49728cf3be34f6dc8", size = 127383, upload-time = "2025-10-31T05:02:47.002Z" }, -] - [[package]] name = "polars" version = "1.38.1" @@ -9474,18 +9460,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/b7/b95708304cd49b7b6f82fdd039f1748b66ec2b21d6a45180910802f1abf1/rpds_py-0.30.0-pp311-pypy311_pp73-musllinux_1_2_x86_64.whl", hash = "sha256:ac37f9f516c51e5753f27dfdef11a88330f04de2d564be3991384b2f3535d02e", size = 562191, upload-time = "2025-11-30T20:24:36.853Z" }, ] -[[package]] -name = "rpyc" -version = "6.0.2" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "plumbum" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/8b/e7/1c17410673b634f4658bb5d2232d0c4507432a97508b2c6708e59481644a/rpyc-6.0.2.tar.gz", hash = "sha256:8e780a6a71b842128a80a337c64adfb6f919014e069951832161c9efc630c93b", size = 62321, upload-time = "2025-04-18T16:33:21.693Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/3f/99/2e119d541d596daea39643eb9312b47c7847383951300f889166938035b1/rpyc-6.0.2-py3-none-any.whl", hash = "sha256:8072308ad30725bc281c42c011fc8c922be15f3eeda6eafb2917cafe1b6f00ec", size = 74768, upload-time = "2025-04-18T16:33:20.147Z" }, -] - [[package]] name = "ruff" version = "0.14.3"