Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions dimos/robot/unitree/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import asyncio
from dataclasses import dataclass
import functools
import os
import threading
import time
from typing import Any, TypeAlias
Expand Down Expand Up @@ -85,12 +86,17 @@ def to_ndarray(self, format=None): # type: ignore[no-untyped-def]
class UnitreeWebRTCConnection(Resource):
_SPORT_API_ID_RAGEMODE: int = 2059

def __init__(self, ip: str, mode: str = "ai") -> None:
def __init__(self, ip: str, mode: str = "ai", aes_128_key: str | None = None) -> None:
self.ip = ip
self.mode = mode
self.stop_timer: threading.Timer | None = None
self.cmd_vel_timeout = 0.2
self.conn = LegionConnection(WebRTCConnectionMethod.LocalSTA, ip=self.ip)
# Per-device AES-128 key required by G1 firmware >= 1.5.1 (data2=3 WebRTC handshake).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also include GO2 firmware 1.1.15 as it has the same issue and this should fix it for the GO2 too !

# Fetch with: unitree-fetch-aes-key --email YOU --sn <serial>
if not aes_128_key:
aes_128_key = os.environ.get("UNITREE_AES_128_KEY")
extra: dict[str, Any] = {"aes_128_key": aes_128_key} if aes_128_key else {}
self.conn = LegionConnection(WebRTCConnectionMethod.LocalSTA, ip=self.ip, **extra)
self.connect()

def connect(self) -> None:
Expand Down
7 changes: 6 additions & 1 deletion dimos/robot/unitree/g1/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
class G1Config(ModuleConfig):
ip: str = Field(default_factory=lambda m: m["g"].robot_ip)
connection_type: str = Field(default_factory=lambda m: m["g"].unitree_connection_type)
# Per-device AES-128 key for G1 firmware >= 1.5.1 (data2=3 WebRTC handshake).
# If unset here, UnitreeWebRTCConnection falls back to the UNITREE_AES_128_KEY env var.
aes_128_key: str | None = None


class G1ConnectionBase(Module, ABC):
Expand Down Expand Up @@ -78,7 +81,9 @@ def start(self) -> None:

match self.config.connection_type:
case "webrtc":
self.connection = UnitreeWebRTCConnection(self.config.ip)
self.connection = UnitreeWebRTCConnection(
self.config.ip, aes_128_key=self.config.aes_128_key
)
case "replay":
raise ValueError("Replay connection not implemented for G1 robot")
case "mujoco":
Expand Down
9 changes: 8 additions & 1 deletion dimos/robot/unitree/g1/effectors/high_level/webrtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
class G1HighLevelWebRtcConfig(ModuleConfig):
ip: str | None = None
connection_mode: str = "ai"
# Per-device AES-128 key for G1 firmware >= 1.5.1 (data2=3 WebRTC handshake).
# If unset here, UnitreeWebRTCConnection falls back to the UNITREE_AES_128_KEY env var.
aes_128_key: str | None = None


class G1HighLevelWebRtc(Module, HighLevelG1Spec):
Expand All @@ -62,7 +65,11 @@ def __init__(self, *args: Any, g: GlobalConfig = global_config, **kwargs: Any) -
def start(self) -> None:
super().start()
assert self.config.ip is not None, "ip must be set in G1HighLevelWebRtcConfig"
self.connection = UnitreeWebRTCConnection(self.config.ip, self.config.connection_mode)
self.connection = UnitreeWebRTCConnection(
self.config.ip,
self.config.connection_mode,
aes_128_key=self.config.aes_128_key,
)
self.connection.start()
self.register_disposable(Disposable(self.cmd_vel.subscribe(self.move)))

Expand Down
15 changes: 12 additions & 3 deletions dimos/robot/unitree/go2/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class Go2Mode(str, Enum):
class ConnectionConfig(ModuleConfig):
ip: str = Field(default_factory=lambda m: m["g"].robot_ip)
mode: Go2Mode = Go2Mode.DEFAULT
# Per-device AES-128 key for Go2 firmware >= 1.1.15 (data2=3 WebRTC handshake).
# If unset here, UnitreeWebRTCConnection falls back to the UNITREE_AES_128_KEY env var.
aes_128_key: str | None = None


class Go2ConnectionProtocol(Protocol):
Expand Down Expand Up @@ -113,7 +116,11 @@ def _camera_info_static() -> CameraInfo:
)


def make_connection(ip: str | None, cfg: GlobalConfig) -> Go2ConnectionProtocol:
def make_connection(
ip: str | None,
cfg: GlobalConfig,
aes_128_key: str | None = None,
) -> Go2ConnectionProtocol:
connection_type = cfg.unitree_connection_type

if ip in ("fake", "mock", "replay") or connection_type == "replay":
Expand All @@ -129,7 +136,7 @@ def make_connection(ip: str | None, cfg: GlobalConfig) -> Go2ConnectionProtocol:
return DimSimConnection(cfg)
elif connection_type == "webrtc":
assert ip is not None, "IP address must be provided"
return UnitreeWebRTCConnection(ip)
return UnitreeWebRTCConnection(ip, aes_128_key=aes_128_key)
else:
raise ValueError(f"Unknown simulator {cfg.simulation!r}. Choose from: mujoco, dimsim")

Expand Down Expand Up @@ -221,7 +228,9 @@ def rerun_views(cls): # type: ignore[no-untyped-def]

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.connection = make_connection(self.config.ip, self.config.g)
self.connection = make_connection(
self.config.ip, self.config.g, aes_128_key=self.config.aes_128_key
)

if hasattr(self.connection, "camera_info_static"):
self.camera_info_static = self.connection.camera_info_static
Expand Down
7 changes: 6 additions & 1 deletion dimos/robot/unitree/go2/fleet_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ def __init__(self, **kwargs: Any) -> None:
def start(self) -> None:
self._extra_connections.clear()
for ip in self._extra_ips:
conn = make_connection(ip, self.config.g)
# Forward the configured key to every follower. The AES-128 key is
# technically per-device, so a heterogeneous fleet needs each robot's
# own key; that's a future per-IP mapping. For homogeneous-key or
# env-var-driven setups this matches the primary's behaviour and
# avoids leaving followers reliant on UNITREE_AES_128_KEY alone.
conn = make_connection(ip, self.config.g, aes_128_key=self.config.aes_128_key)
conn.start()
self._extra_connections.append(conn)

Expand Down
50 changes: 50 additions & 0 deletions dimos/robot/unitree/go2/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Targeted test for go2.connection.make_connection's aes_128_key forwarding.

Pins the wiring added in PR #2117 so renaming the kwarg in either
UnitreeWebRTCConnection or make_connection without updating the other
fails loudly. The leaf (UnitreeWebRTCConnection.__init__) is exercised
in dimos/robot/unitree/test_connection.py — this file only covers the
go2-local routing.
"""

from types import SimpleNamespace
from unittest.mock import MagicMock

import pytest

from dimos.robot.unitree.go2 import connection as go2_conn


@pytest.fixture
def stub_webrtc(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
"""Replace UnitreeWebRTCConnection in go2.connection with a MagicMock so
make_connection's webrtc branch is exercised without dialing out."""
stub = MagicMock(name="UnitreeWebRTCConnection")
monkeypatch.setattr(go2_conn, "UnitreeWebRTCConnection", stub)
return stub


def test_make_connection_webrtc_forwards_aes_128_key(stub_webrtc: MagicMock) -> None:
"""Webrtc branch must forward aes_128_key as a kwarg to UnitreeWebRTCConnection.

Guards the Go2 half of the PR #2117 fix: without this forwarding, Go2
robots on firmware >=1.1.15 fail the WebRTC handshake even when a key
is provided in config.
"""
cfg = SimpleNamespace(unitree_connection_type="webrtc")
go2_conn.make_connection("192.168.123.161", cfg, aes_128_key="cafe" * 8)
stub_webrtc.assert_called_once_with("192.168.123.161", aes_128_key="cafe" * 8)
113 changes: 113 additions & 0 deletions dimos/robot/unitree/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for UnitreeWebRTCConnection's aes_128_key kwarg + env-var fallback.

Pure-Python tests — no hardware, no network. Mocks the LegionConnection driver
and the connect() side-effect so __init__ stays inside the kwarg-forwarding logic.
"""

from typing import Any
from unittest.mock import MagicMock

import pytest

from dimos.robot.unitree import connection as conn_mod
from dimos.robot.unitree.connection import UnitreeWebRTCConnection


@pytest.fixture
def stub_legion(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
"""Replace LegionConnection in the module with a MagicMock and suppress
UnitreeWebRTCConnection.connect so __init__ doesn't try to dial out."""
monkeypatch.setattr(UnitreeWebRTCConnection, "connect", lambda self: None)
legion = MagicMock(name="LegionConnection")
monkeypatch.setattr(conn_mod, "LegionConnection", legion)
return legion


def _aes_kwarg(legion: MagicMock) -> Any:
"""Pull aes_128_key out of the LegionConnection call args, or None if absent."""
_args, kwargs = legion.call_args
return kwargs.get("aes_128_key")


def test_aes_key_omitted_when_neither_kwarg_nor_env(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Default behaviour: no kwarg, no env var → aes_128_key not forwarded.

Guarantees the call is byte-identical to the pre-PR behaviour for users
on G1 firmware <1.5.1 and all Go2 robots.
"""
monkeypatch.delenv("UNITREE_AES_128_KEY", raising=False)
UnitreeWebRTCConnection(ip="192.168.123.161")
assert _aes_kwarg(stub_legion) is None
assert "aes_128_key" not in stub_legion.call_args.kwargs


def test_aes_key_from_explicit_kwarg(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Caller passes the key directly → forwarded verbatim."""
monkeypatch.delenv("UNITREE_AES_128_KEY", raising=False)
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="aa" * 16)
assert _aes_kwarg(stub_legion) == "aa" * 16


def test_aes_key_from_env_when_kwarg_none(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Env-var fallback: kwarg unset → UNITREE_AES_128_KEY is used."""
monkeypatch.setenv("UNITREE_AES_128_KEY", "bb" * 16)
UnitreeWebRTCConnection(ip="192.168.123.161")
assert _aes_kwarg(stub_legion) == "bb" * 16


def test_explicit_kwarg_beats_env(stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch) -> None:
"""Precedence: explicit kwarg wins over UNITREE_AES_128_KEY env var."""
monkeypatch.setenv("UNITREE_AES_128_KEY", "from-env")
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="from-kwarg")
assert _aes_kwarg(stub_legion) == "from-kwarg"


def test_empty_string_kwarg_falls_back_to_env_when_unset(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Empty-string kwarg + unset env → final key is None, nothing forwarded.

Truthiness guard means `aes_128_key=""` falls through to the env-var
lookup just like `aes_128_key=None` would. With the env unset, the final
value is None and the kwarg is omitted from the LegionConnection call —
same byte-identical behaviour as the unset case.
"""
monkeypatch.delenv("UNITREE_AES_128_KEY", raising=False)
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="")
assert "aes_128_key" not in stub_legion.call_args.kwargs


def test_empty_string_kwarg_uses_env_when_set(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Empty-string kwarg + set env → env value wins.

Guards against the bug greptile flagged on PR #2117: a YAML/JSON config
serialising `aes_128_key: ""` instead of `null` MUST still allow the env
var to provide the real key. Otherwise the connection silently fails on
G1 firmware >=1.5.1 (and Go2 firmware >=1.1.15) with
'RSA key format is not supported'.
"""
monkeypatch.setenv("UNITREE_AES_128_KEY", "cc" * 16)
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="")
assert _aes_kwarg(stub_legion) == "cc" * 16