diff --git a/environment.yaml b/environment.yaml index 2fc42d5..859fc99 100644 --- a/environment.yaml +++ b/environment.yaml @@ -18,6 +18,7 @@ dependencies: - neuracore - neuracore_types - pyrealsense2 + - feetech-servo-sdk diff --git a/example-agilex-dictionary.txt b/example-agilex-dictionary.txt index 25fbae6..e21a1f7 100644 --- a/example-agilex-dictionary.txt +++ b/example-agilex-dictionary.txt @@ -11,6 +11,9 @@ fromarray hyperparameters imshow Levenberg +lerobot +Lerobot +LEROBOT linalg Marquardt mathbb @@ -49,3 +52,10 @@ Viser wxyz xyzw yourdfpy +scservo +BAUDRATE +huggingface +LOBYTE +HIBYTE +Feetech +Millis diff --git a/examples/X_leader_arm_teleop_agilex.py b/examples/X_leader_arm_teleop_agilex.py new file mode 100644 index 0000000..4959858 --- /dev/null +++ b/examples/X_leader_arm_teleop_agilex.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +"""Piper (Agilex) teleop with SO100 leader arm - direct joint mapping. + +Leader arm drives the Piper URDF in the GUI; optionally drive the real robot +with --real-robot. The leader arm class is configured with Piper follower +limits, offsets, and directions and outputs mapped joint angles + gripper. + +Requirements: +- Leader arm: calibrated S0100 (use same --leader-id as when calibrating). +- Conda: piper-teleop. + +Usage: + cd example_agilex/examples + python 0_leader_arm_teleop_agilex.py --leader-port /dev/ttyACM0 --leader-id my_awesome_leader_arm + python 0_leader_arm_teleop_agilex.py --real-robot --leader-port /dev/ttyACM0 --leader-id my_awesome_leader_arm + + Move the leader arm to drive the URDF (and robot if --real-robot). Ctrl+C to exit. +""" + +import argparse +import sys +import threading +import time +import traceback +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from common.configs import NEUTRAL_JOINT_ANGLES, ROBOT_RATE, VISUALIZATION_RATE +from common.data_manager import DataManager, RobotActivityState +from common.leader_arm import SO100LeaderArm +from common.robot_visualizer import RobotVisualizer +from common.threads.leader_reader import leader_reader_thread + +# NOTE:Piper follower config for the leader arm (limits, offsets, directions, mapping). +# Leader 5 DOF + gripper -> Piper 6 DOF + gripper; Piper joint 4 is fixed at 0. + +PIPER_JOINT_LIMITS_DEG = np.array( + [ + (-150.0, 150.0), + (0.0, 180.0), + (-170.0, 0.0), + (-100.0, 100.0), + (-70.0, 70.0), + (-120.0, 120.0), + ], + dtype=np.float64, +) + +# NOTE: Measure leader zero vs Piper zero per joint (degrees). joint4 unused. +PIPER_OFFSETS_DEG = np.array( + [0.0, 90.0, -90.0, 0.0, 0.0, -45.0], + dtype=np.float64, +) + +PIPER_DIRECTIONS = np.array( + [-1.0, 1.0, 1.0, 1.0, 1.0, -1.0], + dtype=np.float64, +) + +# Leader joint index -> Piper joint index. Piper joint 4 (index 3) has no leader. +LEADER_TO_PIPER_JOINT = {0: 0, 1: 1, 2: 2, 3: 4, 4: 5} +PIPER_FIXED_JOINTS = {3: 0.0} + + +def main() -> None: + """Run Piper teleop with SO100 leader arm (URDF and optionally real robot).""" + from common.configs import URDF_PATH + + parser = argparse.ArgumentParser( + description="Piper URDF teleop with SO100 leader arm." + ) + parser.add_argument("--leader-port", type=str, default="/dev/ttyACM0") + parser.add_argument("--leader-id", type=str, default="my_awesome_leader_arm") + parser.add_argument("--leader-rate", type=float, default=50.0) + parser.add_argument( + "--real-robot", + action="store_true", + help="Use the real Piper robot (default: URDF only)", + ) + parser.add_argument("--can-interface", type=str, default="can0") + args = parser.parse_args() + + use_real_robot = args.real_robot + print("=" * 60) + print( + "PIPER TELEOP WITH SO100 LEADER ARM" + + (" - REAL ROBOT" if use_real_robot else " - URDF only") + ) + print("=" * 60) + + leader = SO100LeaderArm(port=args.leader_port, calibration_id=args.leader_id) + leader.configure_follower( + follower_limits_deg=PIPER_JOINT_LIMITS_DEG, + follower_offsets_deg=PIPER_OFFSETS_DEG, + follower_directions=PIPER_DIRECTIONS, + leader_to_follower_joint=LEADER_TO_PIPER_JOINT, + fixed_joints=PIPER_FIXED_JOINTS, + ) + print("\n๐Ÿฆพ Connecting to S0100 leader arm...") + try: + leader.connect(calibrate=False) + except Exception as e: + print(f"Failed to connect to leader: {e}") + if "no calibration registered" in str(e): + print( + "Create a calibration JSON for this leader id under " + "~/.cache/huggingface/lerobot/calibration/teleoperators/so_leader/" + ) + sys.exit(1) + print("โœ“ Leader arm connected") + + data_manager = DataManager() + + robot_controller = None + joint_state_thread_obj = None + if use_real_robot: + from common.threads.joint_state import joint_state_thread + + from piper_controller import PiperController + + print("\n๐Ÿค– Initializing Piper robot controller...") + robot_controller = PiperController( + can_interface=args.can_interface, + robot_rate=ROBOT_RATE, + control_mode=PiperController.ControlMode.JOINT_SPACE, + neutral_joint_angles=NEUTRAL_JOINT_ANGLES, + debug_mode=False, + ) + robot_controller.start_control_loop() + print("๐Ÿ“Š Starting joint state thread...") + joint_state_thread_obj = threading.Thread( + target=joint_state_thread, + args=(data_manager, robot_controller), + daemon=True, + ) + joint_state_thread_obj.start() + + leader_thread = threading.Thread( + target=leader_reader_thread, + args=(data_manager, leader, args.leader_rate), + daemon=True, + ) + leader_thread.start() + + visualizer = RobotVisualizer(urdf_path=URDF_PATH) + visualizer.add_basic_controls() + visualizer.add_teleop_controls() + visualizer.add_gripper_status_controls() + if use_real_robot: + visualizer.add_homing_controls() + visualizer.add_toggle_robot_enabled_status_button() + + def toggle_robot_enabled() -> None: + assert robot_controller is not None # Only registered when use_real_robot + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + robot_controller.graceful_stop() + data_manager.set_teleop_state(False, None, None) + visualizer.update_toggle_robot_enabled_status(False) + print("โœ“ ๐Ÿ”ด Robot disabled") + elif state == RobotActivityState.DISABLED: + if robot_controller.resume_robot(): + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + visualizer.update_toggle_robot_enabled_status(True) + print("โœ“ ๐ŸŸข Robot enabled") + else: + print("โœ— Failed to enable robot") + + def on_go_home() -> None: + assert robot_controller is not None # Only registered when use_real_robot + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.HOMING) + data_manager.set_teleop_state(False, None, None) + ok = robot_controller.move_to_home() + if not ok: + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + else: + print("โš ๏ธ Cannot home: robot not enabled") + + visualizer.set_toggle_robot_enabled_status_callback(toggle_robot_enabled) + visualizer.set_go_home_callback(on_go_home) + + print() + if use_real_robot: + print( + "๐Ÿš€ Leader arm driving REAL Piper robot. Enable robot in GUI, then move leader. Ctrl+C to exit." + ) + else: + print("๐Ÿš€ Leader arm driving Piper URDF. Move the leader arm. Ctrl+C to exit.") + print() + + dt_viz = 1.0 / VISUALIZATION_RATE + try: + while True: + t0 = time.time() + mapped_angles, mapped_gripper = data_manager.get_leader_mapped_state() + if mapped_angles is not None and mapped_gripper is not None: + data_manager.set_target_joint_angles(mapped_angles) + data_manager.set_controller_data(None, 1.0, 1.0 - mapped_gripper) + data_manager.set_teleop_state(True, None, None) + if not use_real_robot: + data_manager.set_current_joint_angles(mapped_angles) + + current_joint_angles = data_manager.get_current_joint_angles() + _, grip_value, trigger_value = data_manager.get_controller_data() + target_joint_angles = data_manager.get_target_joint_angles() + + visualizer.set_grip_value(grip_value) + visualizer.set_trigger_value(trigger_value) + visualizer.update_teleop_status(data_manager.get_teleop_active()) + + if use_real_robot: + robot_activity_state = data_manager.get_robot_activity_state() + if robot_activity_state == RobotActivityState.ENABLED: + visualizer.update_robot_status("Robot Status: Enabled") + elif robot_activity_state == RobotActivityState.HOMING: + visualizer.update_robot_status("Robot Status: Homing") + else: + visualizer.update_robot_status("Robot Status: Disabled") + if ( + target_joint_angles is not None + and robot_activity_state == RobotActivityState.ENABLED + ): + visualizer.update_ghost_robot_visibility(True) + visualizer.update_ghost_robot_pose(np.radians(target_joint_angles)) + else: + visualizer.update_ghost_robot_visibility(False) + visualizer.update_gripper_status( + trigger_value, + robot_enabled=(robot_activity_state == RobotActivityState.ENABLED), + ) + else: + visualizer.update_robot_status("URDF only โ€“ leader arm driving") + visualizer.update_ghost_robot_visibility(False) + visualizer.update_gripper_status(trigger_value, robot_enabled=True) + + if current_joint_angles is not None: + current_rad = np.radians(current_joint_angles) + visualizer.update_robot_pose(current_rad) + visualizer.update_joint_angles_display(current_rad) + + elapsed = time.time() - t0 + if dt_viz - elapsed > 0: + time.sleep(dt_viz - elapsed) + + except KeyboardInterrupt: + print("\n\n๐Ÿ‘‹ Interrupt โ€“ shutting down...") + except Exception as e: + print(f"\nโŒ Error: {e}") + traceback.print_exc() + + print("\n๐Ÿงน Cleaning up...") + data_manager.request_shutdown() + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + leader_thread.join(timeout=2.0) + if joint_state_thread_obj is not None: + joint_state_thread_obj.join(timeout=2.0) + if robot_controller is not None: + robot_controller.cleanup() + leader.disconnect() + visualizer.stop() + print("๐Ÿ‘‹ Done.") + + +if __name__ == "__main__": + main() diff --git a/examples/common/data_manager.py b/examples/common/data_manager.py index 9e159a1..d2a380c 100644 --- a/examples/common/data_manager.py +++ b/examples/common/data_manager.py @@ -93,6 +93,17 @@ def __init__(self) -> None: self.rgb_image: np.ndarray | None = None +class LeaderMappedState: + """Leader arm mapped state - joint angles and gripper from leader read_mapped().""" + + def __init__(self) -> None: + """Initialize LeaderMappedState with default values.""" + self._lock = threading.Lock() + + self.joint_angles: np.ndarray | None = None + self.gripper_open: float | None = None + + class DataManager: """Main state container coordinating all state groups. @@ -112,6 +123,7 @@ def __init__(self) -> None: self._robot_state = RobotState() self._ik_state = IKState() self._camera_state = CameraState() + self._leader_mapped_state = LeaderMappedState() # System state self._shutdown_event = threading.Event() @@ -530,6 +542,42 @@ def get_ik_success(self) -> bool: with self._ik_state._lock: return self._ik_state.success + # ============================================================================ + # Leader Mapped State Methods + # ============================================================================ + + def set_leader_mapped_state( + self, joint_angles: np.ndarray, gripper_open: float + ) -> None: + """Set leader-mapped joint angles and gripper (thread-safe). + + Args: + joint_angles: Follower-space joint angles from leader read_mapped(). + gripper_open: Gripper open amount in [0, 1]. + """ + with self._leader_mapped_state._lock: + self._leader_mapped_state.joint_angles = ( + joint_angles.copy() if joint_angles is not None else None + ) + self._leader_mapped_state.gripper_open = gripper_open + + def get_leader_mapped_state( + self, + ) -> tuple[np.ndarray | None, float | None]: + """Get leader-mapped joint angles and gripper (thread-safe). + + Returns: + Tuple of (joint_angles, gripper_open); either may be None if not set. + """ + with self._leader_mapped_state._lock: + angles = ( + self._leader_mapped_state.joint_angles.copy() + if self._leader_mapped_state.joint_angles is not None + else None + ) + gripper = self._leader_mapped_state.gripper_open + return (angles, gripper) + # ============================================================================ # System State Methods # ============================================================================ diff --git a/examples/common/leader_arm.py b/examples/common/leader_arm.py new file mode 100644 index 0000000..ecab1ca --- /dev/null +++ b/examples/common/leader_arm.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python3 +"""SO100 leader arm: connect, read, and map to a configured follower. + +This module avoids runtime dependency on lerobot by talking to the motors +directly through scservo_sdk and loading calibration JSON from the same default +cache path used by lerobot tools. +""" + +from __future__ import annotations + +import json +import os +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np +import scservo_sdk as scs + +# Fixed S0100 leader arm parameters (do not change per follower). +JOINT_ACTION_KEYS = [ + "shoulder_pan.pos", + "shoulder_lift.pos", + "elbow_flex.pos", + "wrist_flex.pos", + "wrist_roll.pos", +] +GRIPPER_ACTION_KEY = "gripper.pos" +NUM_JOINTS = 5 +USE_DEGREES = True + +MOTOR_NAMES = [ + "shoulder_pan", + "shoulder_lift", + "elbow_flex", + "wrist_flex", + "wrist_roll", + "gripper", +] +MOTOR_IDS = {name: idx + 1 for idx, name in enumerate(MOTOR_NAMES)} + +MOTOR_MODEL_NUMBER = 777 +MOTOR_RESOLUTION = 4096 +BAUDRATE = 1_000_000 +TIMEOUT_MS = 1000 +PROTOCOL_VERSION = 0 + +SIGN_BIT = 15 +ADDR_RETURN_DELAY_TIME = (7, 1) +ADDR_MAXIMUM_ACCELERATION = (85, 1) +ADDR_ACCELERATION = (41, 1) +ADDR_OPERATING_MODE = (33, 1) +ADDR_TORQUE_ENABLE = (40, 1) +ADDR_LOCK = (55, 1) +ADDR_PRESENT_POSITION = (56, 2) + +_HF_HOME = Path(os.getenv("HF_HOME", Path.home() / ".cache" / "huggingface")) +_LEROBOT_HOME = Path(os.getenv("HF_LEROBOT_HOME", _HF_HOME / "lerobot")) +_CALIBRATION_DIR = Path( + os.getenv("HF_LEROBOT_CALIBRATION", _LEROBOT_HOME / "calibration") +) + + +@dataclass +class MotorCalibration: + """Per-motor calibration loaded from JSON.""" + + id: int + drive_mode: int + homing_offset: int + range_min: int + range_max: int + + +def _decode_sign_magnitude(encoded: int, sign_bit: int = SIGN_BIT) -> int: + direction_bit = (encoded >> sign_bit) & 1 + magnitude = encoded & ((1 << sign_bit) - 1) + return -magnitude if direction_bit else magnitude + + +def _raw_to_degrees(raw: int, range_min: int, range_max: int) -> float: + mid = (range_min + range_max) / 2 + return (raw - mid) * 360.0 / (MOTOR_RESOLUTION - 1) + + +def _raw_to_percent(raw: int, range_min: int, range_max: int) -> float: + bounded = min(range_max, max(range_min, raw)) + return ((bounded - range_min) / (range_max - range_min)) * 100.0 + + +def _load_leader_calibration( + calibration_id: str, +) -> dict[str, MotorCalibration] | None: + cal_file = ( + _CALIBRATION_DIR / "teleoperators" / "so_leader" / f"{calibration_id}.json" + ) + if not cal_file.is_file(): + return None + + with open(cal_file, encoding="utf-8") as file: + raw: dict = json.load(file) + + calibration: dict[str, MotorCalibration] = {} + for motor_name, values in raw.items(): + calibration[motor_name] = MotorCalibration( + id=values["id"], + drive_mode=values["drive_mode"], + homing_offset=values["homing_offset"], + range_min=values["range_min"], + range_max=values["range_max"], + ) + return calibration + + +def _to_bytes(value: int, length: int) -> list[int]: + if length == 1: + return [value & 0xFF] + if length == 2: + return [scs.SCS_LOBYTE(value), scs.SCS_HIBYTE(value)] + raise ValueError(f"Unsupported byte length: {length}") + + +def _patched_set_packet_timeout(port_handler: Any, packet_length: int) -> None: + """Replacement for PortHandler.setPacketTimeout with relaxed timing.""" + port_handler.packet_timeout = ( + port_handler.tx_time_per_byte * packet_length + + port_handler.tx_time_per_byte * 3.0 + + 50 + ) + + +class _FeetechLeaderBus: + """Minimal motor-bus helper for SO100 leader arm reads.""" + + def __init__(self, port: str, calibration: dict[str, MotorCalibration]) -> None: + self.port = port + self.calibration = calibration + self.motor_ids = dict(MOTOR_IDS) + self.port_handler = scs.PortHandler(port) + self.port_handler.setPacketTimeout = _patched_set_packet_timeout.__get__( + self.port_handler, type(self.port_handler) + ) + self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION) + self.sync_reader = scs.GroupSyncRead( + self.port_handler, self.packet_handler, 0, 0 + ) + self._connected = False + + def connect(self) -> None: + if not self.port_handler.openPort(): + raise ConnectionError(f"Failed to open leader port {self.port}") + self.port_handler.setBaudRate(BAUDRATE) + self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) + self._verify_motors() + self._configure_leader() + self._connected = True + + def _verify_motors(self) -> None: + missing = [] + for name, motor_id in self.motor_ids.items(): + model_number, comm, _ = self.packet_handler.ping( + self.port_handler, motor_id + ) + if comm != scs.COMM_SUCCESS: + missing.append(f" - ID {motor_id} ({name}): no response") + elif model_number != MOTOR_MODEL_NUMBER: + missing.append( + f" - ID {motor_id} ({name}): wrong model {model_number}" + ) + + if missing: + raise ConnectionError( + f"Leader motor check failed on {self.port}:\n" + "\n".join(missing) + ) + + def _write_register( + self, motor_name: str, addr: int, length: int, value: int, num_retry: int = 2 + ) -> None: + motor_id = self.motor_ids[motor_name] + data = _to_bytes(value, length) + for _ in range(1 + num_retry): + comm, _ = self.packet_handler.writeTxRx( + self.port_handler, motor_id, addr, length, data + ) + if comm == scs.COMM_SUCCESS: + time.sleep(0.005) + return + time.sleep(0.02) + msg = self.packet_handler.getTxRxResult(comm) + raise ConnectionError( + f"Failed register write on motor '{motor_name}' (ID {motor_id}): {msg}" + ) + + def _disable_torque(self) -> None: + for name in self.motor_ids: + self._write_register(name, *ADDR_TORQUE_ENABLE, 0) + self._write_register(name, *ADDR_LOCK, 0) + + def _configure_leader(self) -> None: + self._disable_torque() + for name in self.motor_ids: + self._write_register(name, *ADDR_RETURN_DELAY_TIME, 0) + self._write_register(name, *ADDR_MAXIMUM_ACCELERATION, 254) + self._write_register(name, *ADDR_ACCELERATION, 254) + self._write_register(name, *ADDR_OPERATING_MODE, 0) + + def disconnect(self) -> None: + try: + self._disable_torque() + except Exception: + pass + self.port_handler.closePort() + self._connected = False + + @property + def is_connected(self) -> bool: + return self._connected and self.port_handler.is_open + + def read_positions(self, num_retry: int = 3) -> dict[str, float]: + addr, length = ADDR_PRESENT_POSITION + comm = None + for _ in range(1 + num_retry): + self.sync_reader.clearParam() + self.sync_reader.start_address = addr + self.sync_reader.data_length = length + for motor_id in self.motor_ids.values(): + self.sync_reader.addParam(motor_id) + + comm = self.sync_reader.txRxPacket() + if comm == scs.COMM_SUCCESS: + break + time.sleep(0.01) + + if comm != scs.COMM_SUCCESS: + msg = self.packet_handler.getTxRxResult(comm) + raise ConnectionError(f"Leader sync read failed on {self.port}: {msg}") + + result: dict[str, float] = {} + for name, motor_id in self.motor_ids.items(): + raw = self.sync_reader.getData(motor_id, addr, length) + decoded = _decode_sign_magnitude(raw) + calibration = self.calibration[name] + if name == "gripper": + value = _raw_to_percent( + decoded, calibration.range_min, calibration.range_max + ) + else: + value = _raw_to_degrees( + decoded, calibration.range_min, calibration.range_max + ) + result[name] = float(value) + return result + + +class SO100LeaderArm: + """SO100 leader arm: read raw or mapped to a configured follower.""" + + def __init__(self, port: str, calibration_id: str) -> None: + """Configure the leader arm (does not connect).""" + self._port = port + self._calibration_id = calibration_id + self._bus: _FeetechLeaderBus | None = None + self._follower_limits_deg: np.ndarray | None = None + self._follower_offsets_deg: np.ndarray | None = None + self._follower_directions: np.ndarray | None = None + self._leader_to_follower_joint: dict[int, int] | None = None + self._fixed_joints: dict[int, float] = {} + + def connect(self, calibrate: bool = False) -> None: + """Connect to the leader arm. Raises if port or calibration fails.""" + if calibrate: + raise RuntimeError( + "calibrate=True is not supported by this script. " + "Run calibration separately, then connect with calibrate=False." + ) + + calibration = _load_leader_calibration(self._calibration_id) + if calibration is None: + expected = _CALIBRATION_DIR / "teleoperators" / "so_leader" + raise RuntimeError( + f"no calibration registered for leader id '{self._calibration_id}'. " + f"Expected JSON under: {expected}" + ) + + missing_motors = [name for name in MOTOR_NAMES if name not in calibration] + if missing_motors: + raise RuntimeError( + "invalid calibration file: missing motors " + ", ".join(missing_motors) + ) + + self._bus = _FeetechLeaderBus(self._port, calibration) + try: + self._bus.connect() + except Exception: + self._bus.disconnect() + self._bus = None + raise + + def disconnect(self) -> None: + """Disconnect from the leader arm.""" + if self._bus is not None: + self._bus.disconnect() + self._bus = None + + def read(self) -> dict[str, float]: + """Read current joint angles (degrees) and gripper (0โ€“100). Raw leader output.""" + if self._bus is None or not self._bus.is_connected: + raise RuntimeError("Leader arm is not connected") + + positions = self._bus.read_positions() + action = { + key: float(positions.get(name, 0.0)) + for key, name in zip(JOINT_ACTION_KEYS, MOTOR_NAMES[:NUM_JOINTS]) + } + action[GRIPPER_ACTION_KEY] = float( + np.clip(positions.get("gripper", 50.0), 0.0, 100.0) + ) + return action + + def configure_follower( + self, + follower_limits_deg: np.ndarray, + follower_offsets_deg: np.ndarray, + follower_directions: np.ndarray, + leader_to_follower_joint: dict[int, int] | list[int], + fixed_joints: dict[int, float] | None = None, + ) -> None: + """Set follower mapping so read_mapped() returns follower-space angles. + + Args: + follower_limits_deg: (n_follower_joints, 2) min/max in degrees per joint. + follower_offsets_deg: (n_follower_joints,) offset per follower joint (leader 0ยฐ -> follower offsetยฐ). + follower_directions: (n_follower_joints,) sign per follower joint (+1 or -1). + leader_to_follower_joint: leader joint index -> follower joint index (dict or list of length NUM_JOINTS). + fixed_joints: optional dict {follower_joint_index: value} for joints with no leader (e.g. {3: 0.0}). + """ + n = follower_limits_deg.shape[0] + assert follower_limits_deg.shape == (n, 2) + assert follower_offsets_deg.shape == (n,) + assert follower_directions.shape == (n,) + if isinstance(leader_to_follower_joint, dict): + assert set(leader_to_follower_joint.keys()) == set(range(NUM_JOINTS)) + else: + assert len(leader_to_follower_joint) == NUM_JOINTS + self._follower_limits_deg = np.asarray(follower_limits_deg, dtype=np.float64) + self._follower_offsets_deg = np.asarray(follower_offsets_deg, dtype=np.float64) + self._follower_directions = np.asarray(follower_directions, dtype=np.float64) + self._leader_to_follower_joint = ( + dict(leader_to_follower_joint) + if isinstance(leader_to_follower_joint, dict) + else {i: v for i, v in enumerate(leader_to_follower_joint)} + ) + self._fixed_joints = dict(fixed_joints) if fixed_joints else {} + + def read_mapped(self) -> tuple[np.ndarray, float]: + """Read leader and return follower-space joint angles (degrees) and gripper open (0โ€“1). + + Must call configure_follower() first. Clips to follower limits; fixed joints set as configured. + """ + if ( + self._follower_limits_deg is None + or self._follower_offsets_deg is None + or self._follower_directions is None + or self._leader_to_follower_joint is None + ): + raise RuntimeError( + "configure_follower() must be called before read_mapped()" + ) + raw = self.read() + n = self._follower_limits_deg.shape[0] + angles = np.zeros(n, dtype=np.float64) + for fj, val in self._fixed_joints.items(): + angles[fj] = val + for i, key in enumerate(JOINT_ACTION_KEYS): + leader_val = raw.get(key, 0.0) + fj = self._leader_to_follower_joint[i] + lo, hi = self._follower_limits_deg[fj, 0], self._follower_limits_deg[fj, 1] + angles[fj] = np.clip( + leader_val * self._follower_directions[fj] + + self._follower_offsets_deg[fj], + lo, + hi, + ) + gripper = float(np.clip(raw.get(GRIPPER_ACTION_KEY, 50.0) / 100.0, 0.0, 1.0)) + return angles, gripper + + @property + def is_connected(self) -> bool: + """True if the leader arm is connected.""" + return self._bus is not None and self._bus.is_connected + + @staticmethod + def joint_keys() -> list[str]: + """Ordered list of body joint keys (no gripper).""" + return list(JOINT_ACTION_KEYS) + + @staticmethod + def gripper_key() -> str: + """Key for gripper value in the action dict.""" + return GRIPPER_ACTION_KEY + + +class LerobotSO100LeaderArm(SO100LeaderArm): + """Backward-compatible alias for existing imports.""" diff --git a/examples/common/threads/leader_reader.py b/examples/common/threads/leader_reader.py new file mode 100644 index 0000000..ead2a84 --- /dev/null +++ b/examples/common/threads/leader_reader.py @@ -0,0 +1,47 @@ +"""Leader arm reader thread โ€“ reads mapped output from leader and writes to DataManager.""" + +import time +import traceback + +from common.data_manager import DataManager +from common.leader_arm import SO100LeaderArm + + +def leader_reader_thread( + data_manager: DataManager, + leader_arm: SO100LeaderArm, + rate_hz: float, +) -> None: + """Read leader arm at rate_hz via read_mapped() and store in data_manager. + + Leader arm must have configure_follower() called before starting this thread. + """ + dt = 1.0 / rate_hz + print("๐Ÿฆพ Leader reader thread started") + try: + while not data_manager.is_shutdown_requested(): + t0 = time.perf_counter() + try: + joint_angles, gripper_open = leader_arm.read_mapped() + data_manager.set_leader_mapped_state(joint_angles, gripper_open) + except RuntimeError as e: + if "no calibration registered" in str(e): + print( + "โŒ Leader has no calibration. Add calibration JSON for this leader id." + ) + data_manager.request_shutdown() + elif "configure_follower" in str(e): + print( + "โŒ Leader arm has no follower config. Call configure_follower() before starting the thread." + ) + data_manager.request_shutdown() + break + elapsed = time.perf_counter() - t0 + if dt - elapsed > 0: + time.sleep(dt - elapsed) + except Exception as e: + print(f"โŒ Leader reader error: {e}") + traceback.print_exc() + data_manager.request_shutdown() + finally: + print("๐Ÿฆพ Leader reader thread stopped")