diff --git a/example-agilex-dictionary.txt b/example-agilex-dictionary.txt index 25fbae6..8defd6b 100644 --- a/example-agilex-dictionary.txt +++ b/example-agilex-dictionary.txt @@ -49,3 +49,5 @@ Viser wxyz xyzw yourdfpy +Lerobot +readchar diff --git a/examples/10_leader_arm_teleop_agilex.py b/examples/10_leader_arm_teleop_agilex.py new file mode 100644 index 0000000..629f4ae --- /dev/null +++ b/examples/10_leader_arm_teleop_agilex.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +"""AgileX PiPER Teleoperation using LeRobot Leader Arm and Foot Pedal. + +This script maps a physical LeRobot leader arm and a 3-button Foot Pedal +to a simulated AgileX PiPER robot in Neuracore. +""" + +import argparse +import os +import sys +import threading +import time +import traceback +from pathlib import Path + +import numpy as np + +# Resolve workspace paths +_file_path = Path(__file__).resolve() +_example_agilex_root = _file_path.parent.parent +_ws_root = _example_agilex_root.parent + +# Add example_so101 to path for the leader_arm module +_example_so101_root = _ws_root / "example_so101" +sys.path.insert(0, str(_example_so101_root)) +sys.path.insert(0, str(_example_so101_root / "examples")) + +# Add neuracore and other dependencies +_neuracore_pkg_root = _ws_root / "neuracore" +sys.path.insert(0, str(_neuracore_pkg_root)) +sys.path.insert( + 0, str(_example_agilex_root) +) # Fix for local piper_controller if needed + +# CRITICAL: Set PYTHONPATH so that spawned subprocesses (like the data daemon) can find neuracore +os.environ["PYTHONPATH"] = ( + str(_neuracore_pkg_root) + os.pathsep + os.environ.get("PYTHONPATH", "") +) + +import neuracore as nc # noqa: E402 + +# Import from example_so101 common components +from common.data_manager import DataManager, RobotActivityState # noqa: E402 +from common.leader_arm import LerobotSO101LeaderArm # noqa: E402 +from neuracore.core.input_devices.foot_pedal import FootPedal # noqa: E402 + +# AgileX PiPER Configurations +PIPER_JOINT_NAMES = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"] +# Limits in degrees +PIPER_LIMITS_DEG = np.array( + [ + (-150.0, 150.0), + (0.0, 180.0), + (-170.0, 0.0), + (-100.0, 100.0), + (-70.0, 70.0), + (-120.0, 120.0), + ], + dtype=np.float64, +) + +PIPER_OFFSETS_DEG = np.zeros(6, dtype=np.float64) +PIPER_DIRECTIONS = np.ones(6, dtype=np.float64) + +# Leader (5 joints) to AgileX (6 joints) mapping +# Leader 0 (Pan) -> Piper joint1 +# Leader 1 (Lift) -> Piper joint2 +# Leader 2 (Elbow) -> Piper joint3 +# Leader 3 (Wrist Flex) -> Piper joint5 +# Leader 4 (Wrist Roll) -> Piper joint6 +LEADER_TO_PIPER_JOINT = {0: 0, 1: 1, 2: 2, 3: 4, 4: 5} +PIPER_FIXED_JOINTS = {3: 0.0} # joint4 fixed at 0 + +URDF_PATH = ( + _example_agilex_root / "piper_description" / "urdf" / "piper_description.urdf" +) + + +def leader_reader_thread( + data_manager: DataManager, leader: LerobotSO101LeaderArm, rate: float +) -> None: + """Thread for reading data from the leader arm and updating the shared state. + + Args: + data_manager: Shared state manager. + leader: Initialized leader arm instance. + rate: Frequency in Hz to read the arm. + """ + dt = 1.0 / rate + while not data_manager.is_shutdown_requested(): + try: + if leader.is_connected: + angles, gripper = leader.read_mapped() + data_manager.set_leader_mapped_state(angles, gripper) # type: ignore[attr-defined] + except Exception as e: + print(f"Error in leader reader thread: {e}") + time.sleep(dt) + + +def main() -> None: + """Main execution entry point for teleoperation script.""" + parser = argparse.ArgumentParser( + description="AgileX PiPER Teleop with LeRobot Leader and Foot Pedal" + ) + parser.add_argument( + "--leader-port", + type=str, + default="/dev/ttyACM0", + help="USB port for leader arm", + ) + parser.add_argument( + "--leader-id", + type=str, + default="my_awesome_leader_arm", + help="LeRobot calibration id", + ) + parser.add_argument( + "--leader-rate", type=float, default=50.0, help="Reading rate in Hz" + ) + parser.add_argument( + "--robot-name", type=str, default="AgileX PiPER", help="Robot name in Neuracore" + ) + + # Pedal keys (Matching OpenArm unified mapping) + parser.add_argument("--pedal-activate", type=str, default="a") + parser.add_argument("--pedal-home", type=str, default="h") + parser.add_argument("--pedal-record", type=str, default="r") + + args = parser.parse_args() + + print("=" * 60) + print("AGILEX PIPER LE ROBOT + FOOT PEDAL TELEOP") + print("=" * 60) + + # 1. Initialize Leader Arm + print(f"\n🦾 Connecting to Le Robot leader on {args.leader_port}...") + leader = LerobotSO101LeaderArm(port=args.leader_port, calibration_id=args.leader_id) + try: + leader.connect(calibrate=False) + except Exception as e: + print(f"āŒ Failed to connect to leader: {e}") + print("Tip: Run the robust calibration script first.") + sys.exit(1) + + # Adjusted Piper Config for better mapping + local_limits_deg = np.array( + [ + (-150.0, 150.0), + (0.0, 160.0), + (-160.0, 0.0), # J1, J2, J3 + (-100.0, 100.0), + (-100.0, 100.0), + (-180.0, 180.0), # J4, J5, J6 + ], + dtype=np.float64, + ) + + # Reverting to the previous working offsets as requested + # These worked well for the user in the initial session + local_offsets_deg = np.array([-1.0, 80.0, -51.0, 0.0, 16.0, 2.6]) + local_directions = np.array([1.0, 1.0, -1.0, 1.0, 1.0, 1.0]) + + leader.configure_follower( + follower_limits_deg=local_limits_deg, + follower_offsets_deg=local_offsets_deg, + follower_directions=local_directions, + leader_to_follower_joint=LEADER_TO_PIPER_JOINT, + fixed_joints=PIPER_FIXED_JOINTS, + ) + print("āœ“ Leader arm configured with WORKING offsets (Reverted)") + + # Neutral Pose for Parking/Home (degrees) - Using official values + NEUTRAL_POSE_DEG = np.array([-1.0, 80.0, -51.0, 0.0, 16.0, 2.6]) + + # 2. Initialize Data Manager and Foot Pedal + data_manager = DataManager() + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) # Start DISABLED + + pedal = FootPedal() + + # HARDCODE working keys for the user's specific FootSwitch + # a = ACTIVATE, b = HOME, c = RECORD + activate_key = "a" + home_key = "b" + record_key = "c" + + # Save this config so the FootPedal class picks it up + pedal.save_config( + activate_key=activate_key, home_key=home_key, record_key=record_key + ) + print(f"āœ“ PEDAL MAPPINGS ENFORCED: {pedal.mappings}") + + # Define callbacks first + def on_activate() -> None: + """Toggle the robot enable/disable state.""" + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + print("āœ“ [PEDAL] Robot DISABLED (Parking...)") + else: + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + print("āœ“ [PEDAL] Robot ENABLED (Following Leader)") + + def on_home() -> None: + """Trigger the homing procedure.""" + print("šŸ  [PEDAL] Moving to home position...") + data_manager.set_robot_activity_state(RobotActivityState.HOMING) + + def on_record() -> None: + """Toggle Neuracore recording session.""" + print("āŗļø [PEDAL] Toggling recording...") + current_status = nc.is_recording() + print(f"šŸ” [DEBUG] Current Neuracore recording status: {current_status}") + if not current_status: + try: + nc.start_recording() + print("āœ“ [PEDAL] Recording STARTED") + except Exception as e: + print(f"āœ— [PEDAL] Failed to start recording: {e}") + else: + try: + nc.stop_recording() + print("āœ“ [PEDAL] Recording STOPPED") + except Exception as e: + print(f"āœ— [PEDAL] Failed to stop recording: {e}") + + # Explicitly register + pedal.on("activate", on_activate) + pedal.on("home", on_home) + pedal.on("record", on_record) + + pedal.start() + print( + f"āŒØļø Foot Pedal listener started (Keys: {args.pedal_activate}, {args.pedal_home}, {args.pedal_record})" + ) + + # 3. Initialize Neuracore + print("\nšŸ”§ Initializing Neuracore...") + nc.login() + print(f"šŸ“¦ Connecting to Robot: {args.robot_name}") + nc.connect_robot( + robot_name=args.robot_name, urdf_path=str(URDF_PATH), overwrite=True + ) + + ds_name = f"piper-leader-teleop-{time.strftime('%Y%m%d-%H%M%S')}" + print(f"šŸ“‚ Creating Dataset: {ds_name}") + nc.create_dataset(name=ds_name) + + # 4. Start Leader Reading Thread + reader_thread = threading.Thread( + target=leader_reader_thread, + args=(data_manager, leader, args.leader_rate), + daemon=True, + ) + reader_thread.start() + + print("\nšŸš€ TELEOP READY") + print("------------------------------------------------------------") + print("1. Robot starts in DISABLED (Neutral) pose.") + print(f"2. Press '{args.pedal_activate}' to ENABLE (Mirror Leader).") + print(f"3. Press '{args.pedal_record}' to Toggle Recording.") + print("------------------------------------------------------------") + + loop_count = 0 + try: + while True: + t0 = time.time() + mapped_angles, mapped_gripper = data_manager.get_leader_mapped_state() + state = data_manager.get_robot_activity_state() + + if state == RobotActivityState.HOMING: + angles_deg = NEUTRAL_POSE_DEG + gripper_val = 0.5 + time.sleep(0.5) + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + elif state == RobotActivityState.DISABLED: + angles_deg = NEUTRAL_POSE_DEG + gripper_val = 0.5 + else: + # ENABLED: Follow Leader + if mapped_angles is not None: + angles_deg = mapped_angles + gripper_val = mapped_gripper if mapped_gripper is not None else 0.5 + else: + angles_deg = NEUTRAL_POSE_DEG + gripper_val = 0.5 + + # Streaming and Logging + if angles_deg is not None: + loop_count += 1 + # Debug every 30 loops (~2Hz) + if loop_count % 30 == 0: + if state == RobotActivityState.DISABLED: + print( + f"šŸ’” [REMINDER] Robot is DISABLED. Press '{args.pedal_activate}' to Mirror. [Raw Grip: {gripper_val:.2f}]" + ) + else: + print( + f"šŸ“” [MIRRORING] J1-3: {np.round(angles_deg[:3], 1)} | Grip: {gripper_val:.2f}" + ) + + # Log to Neuracore + data_dict = {} + for i, deg in enumerate(angles_deg): + val = float(np.radians(deg)) + # Explicit small offset for joint4 if 0.0 is problematic + if i == 3 and abs(val) < 0.0001: + val = 0.0001 + data_dict[f"joint{i+1}"] = val + + # Visual Gripper Joints (joint7, joint8) + if gripper_val is not None: + # joint7: 0 (closed) to 0.035 (open) + # joint8: 0 (closed) to -0.035 (open) + data_dict["joint7"] = float(gripper_val * 0.035) + data_dict["joint8"] = float(-gripper_val * 0.035) + + nc.log_joint_positions(data_dict, timestamp=t0) + + else: + if loop_count % 60 == 0: + print("āš ļø [WARNING] No leader data received! Is it plugged in?") + + time.sleep(max(0, 1.0 / 60.0 - (time.time() - t0))) + + except KeyboardInterrupt: + print("\nšŸ‘‹ Teleop stopped by user.") + except Exception as e: + print(f"\nāŒ Error: {e}") + traceback.print_exc() + finally: + data_manager.request_shutdown() + pedal.stop() + reader_thread.join(timeout=1.0) + leader.disconnect() + nc.logout() + print("šŸ‘‹ Shutdown complete.") + + +if __name__ == "__main__": + main() diff --git a/examples/7_teleop_with_pedal.py b/examples/7_teleop_with_pedal.py new file mode 100644 index 0000000..4860884 --- /dev/null +++ b/examples/7_teleop_with_pedal.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +"""Piper Robot Teleoperation with Meta Quest and Foot Pedal control. + +This script combines Meta Quest controller input for movement with Foot Pedal +control for session management (Activate, Home, Record). +""" + +import argparse +import multiprocessing +import sys +import threading +import time +from pathlib import Path +from typing import Any + +import neuracore as nc +import numpy as np + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) +# Add neuracore path for local imports if needed +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "neuracore")) + +from common.configs import ( + CONTROLLER_BETA, + CONTROLLER_D_CUTOFF, + CONTROLLER_MIN_CUTOFF, + GRIPPER_FRAME_NAME, + GRIPPER_LOGGING_NAME, + JOINT_NAMES, + NEUTRAL_JOINT_ANGLES, + POSTURE_COST_VECTOR, + ROBOT_RATE, + URDF_PATH, +) +from common.data_manager import DataManager, RobotActivityState +from common.threads.camera import camera_thread +from common.threads.ik_solver import ik_solver_thread +from common.threads.joint_state import joint_state_thread +from common.threads.quest_reader import quest_reader_thread +from meta_quest_teleop.reader import MetaQuestReader +from neuracore.core.input_devices.foot_pedal import FootPedal + +from pink_ik_solver import PinkIKSolver +from piper_controller import PiperController + + +def log_to_neuracore_on_change_callback( + name: str, value: Any, timestamp: float +) -> None: + """Callback triggered on state changes to log data to Neuracore. + + Args: + name: Name of the data stream. + value: Data value (float, array, or image). + timestamp: Time of the change. + """ + try: + if name == "log_joint_positions": + data_dict = {jn: np.radians(a) for jn, a in zip(JOINT_NAMES, value)} + nc.log_joint_positions(data_dict, timestamp=timestamp) + elif name == "log_joint_target_positions": + data_dict = {jn: np.radians(a) for jn, a in zip(JOINT_NAMES, value)} + nc.log_joint_target_positions(data_dict, timestamp=timestamp) + elif name == "log_parallel_gripper_open_amounts": + nc.log_parallel_gripper_open_amounts( + {GRIPPER_LOGGING_NAME: value}, timestamp=timestamp + ) + elif name == "log_parallel_gripper_target_open_amounts": + nc.log_parallel_gripper_target_open_amounts( + {GRIPPER_LOGGING_NAME: value}, timestamp=timestamp + ) + elif name == "log_rgb": + nc.log_rgb("rgb", value, timestamp=timestamp) + except Exception as e: + print(f"āš ļø Logging failed: {e}") + + +def toggle_robot_state() -> None: + """Toggle the robot's activity state between ENABLED and DISABLED.""" + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + robot_controller.graceful_stop() + data_manager.set_teleop_state(False, None, None) + print("āœ“ [ACTION] Robot DISABLED") + elif state == RobotActivityState.DISABLED: + if robot_controller.resume_robot(): + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + print("āœ“ [ACTION] Robot ENABLED") + else: + print("āœ— [ACTION] Failed to enable robot") + + +def move_robot_home() -> None: + """Command the robot to move to its neutral/home position.""" + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + print("šŸ  [ACTION] Moving to home position...") + data_manager.set_robot_activity_state(RobotActivityState.HOMING) + data_manager.set_teleop_state(False, None, None) + if not robot_controller.move_to_home(): + print("āœ— [ACTION] Homing failed") + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + else: + print("āš ļø [ACTION] Cannot home: robot not enabled") + + +def toggle_recording() -> None: + """Start or stop a data recording session in Neuracore.""" + if not nc.is_recording(): + try: + nc.start_recording() + print("āœ“ [ACTION] Recording STARTED") + except Exception as e: + print(f"āœ— [ACTION] Recording failed to start: {e}") + else: + try: + nc.stop_recording() + print("āœ“ [ACTION] Recording STOPPED") + except Exception as e: + print(f"āœ— [ACTION] Recording failed to stop: {e}") + + +if __name__ == "__main__": + multiprocessing.set_start_method("spawn") + + parser = argparse.ArgumentParser(description="Combined Quest + Pedal Teleoperation") + parser.add_argument("--ip-address", type=str, help="Meta Quest IP") + parser.add_argument("--dataset-name", type=str, help="Neuracore dataset name") + args = parser.parse_args() + + print("=" * 60) + print("PIPER TELEOP: META QUEST + FOOT PEDALS") + print("=" * 60) + + # Neuracore Init + print("\nšŸ”§ Initializing Neuracore...") + nc.login() + nc.connect_robot( + robot_name="AgileX PiPER", urdf_path=str(URDF_PATH), overwrite=False + ) + + ds_name = args.dataset_name or f"pedal-teleop-{time.strftime('%H-%M-%S')}" + nc.create_dataset(name=ds_name, description="Quest + Pedal unified collection") + + # Shared State + data_manager = DataManager() + data_manager.set_on_change_callback(log_to_neuracore_on_change_callback) + data_manager.set_controller_filter_params( + CONTROLLER_MIN_CUTOFF, CONTROLLER_BETA, CONTROLLER_D_CUTOFF + ) + + # Robot Initialization + print("\nšŸ¤– Initializing Piper...") + robot_controller = PiperController(can_interface="can0", robot_rate=ROBOT_RATE) + robot_controller.start_control_loop() + + # Threads + print("\nšŸ“Š Starting Threads (JointState, QuestReader, IKSolver, Camera)...") + threading.Thread( + target=joint_state_thread, args=(data_manager, robot_controller), daemon=True + ).start() + + quest_reader = MetaQuestReader(ip_address=args.ip_address, port=5555, run=True) + threading.Thread( + target=quest_reader_thread, args=(data_manager, quest_reader), daemon=True + ).start() + + # Sync IK solver to current position + initial_joints = np.radians( + data_manager.get_current_joint_angles() or NEUTRAL_JOINT_ANGLES + ) + ik_solver = PinkIKSolver( + urdf_path=URDF_PATH, + end_effector_frame=GRIPPER_FRAME_NAME, + initial_configuration=initial_joints, + posture_cost_vector=np.array(POSTURE_COST_VECTOR), + ) + threading.Thread( + target=ik_solver_thread, args=(data_manager, ik_solver), daemon=True + ).start() + threading.Thread(target=camera_thread, args=(data_manager,), daemon=True).start() + + # Foot Pedal Initialization + print("\nāŒØļø Initializing Foot Pedals...") + pedal = FootPedal() + pedal.on("activate", toggle_robot_state) + pedal.on("home", move_robot_home) + pedal.on("record", toggle_recording) + pedal.start() + + print("\nāœ… SYSTEM ONLINE") + print("------------------------------------------------------------") + print("šŸŽ® QUEST CONTROLS: Hold GRIP to move, TRIGGER for gripper") + print("āŒØļø PEDAL CONTROLS: ACTIVATE (Enable), HOME (Reset), RECORD") + print("------------------------------------------------------------") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nšŸ‘‹ Shutting down...") + finally: + pedal.stop() + if nc.is_recording(): + nc.cancel_recording() + nc.logout() + data_manager.request_shutdown() + quest_reader.stop() + robot_controller.cleanup() diff --git a/examples/common/data_manager.py b/examples/common/data_manager.py index 9e159a1..d2a380c 100644 --- a/examples/common/data_manager.py +++ b/examples/common/data_manager.py @@ -93,6 +93,17 @@ def __init__(self) -> None: self.rgb_image: np.ndarray | None = None +class LeaderMappedState: + """Leader arm mapped state - joint angles and gripper from leader read_mapped().""" + + def __init__(self) -> None: + """Initialize LeaderMappedState with default values.""" + self._lock = threading.Lock() + + self.joint_angles: np.ndarray | None = None + self.gripper_open: float | None = None + + class DataManager: """Main state container coordinating all state groups. @@ -112,6 +123,7 @@ def __init__(self) -> None: self._robot_state = RobotState() self._ik_state = IKState() self._camera_state = CameraState() + self._leader_mapped_state = LeaderMappedState() # System state self._shutdown_event = threading.Event() @@ -530,6 +542,42 @@ def get_ik_success(self) -> bool: with self._ik_state._lock: return self._ik_state.success + # ============================================================================ + # Leader Mapped State Methods + # ============================================================================ + + def set_leader_mapped_state( + self, joint_angles: np.ndarray, gripper_open: float + ) -> None: + """Set leader-mapped joint angles and gripper (thread-safe). + + Args: + joint_angles: Follower-space joint angles from leader read_mapped(). + gripper_open: Gripper open amount in [0, 1]. + """ + with self._leader_mapped_state._lock: + self._leader_mapped_state.joint_angles = ( + joint_angles.copy() if joint_angles is not None else None + ) + self._leader_mapped_state.gripper_open = gripper_open + + def get_leader_mapped_state( + self, + ) -> tuple[np.ndarray | None, float | None]: + """Get leader-mapped joint angles and gripper (thread-safe). + + Returns: + Tuple of (joint_angles, gripper_open); either may be None if not set. + """ + with self._leader_mapped_state._lock: + angles = ( + self._leader_mapped_state.joint_angles.copy() + if self._leader_mapped_state.joint_angles is not None + else None + ) + gripper = self._leader_mapped_state.gripper_open + return (angles, gripper) + # ============================================================================ # System State Methods # ============================================================================ diff --git a/scripts/foot_pedal/README.md b/scripts/foot_pedal/README.md new file mode 100644 index 0000000..82ce39e --- /dev/null +++ b/scripts/foot_pedal/README.md @@ -0,0 +1,51 @@ +# Foot Pedal Control System + +The Foot Pedal system allows you to control the robot and data collection using a 3-pedal USB device (or any keyboard-mapped input). + +## Actions +- **ACTIVATE**: Enables the robot and aligns targets to current joint positions. +- **HOME**: Moves the robot arm to its predefined home position. +- **RECORD**: Toggles Neuracore data collection (Start/Stop recording). + +## Setup and Configuration + +### Remapping Keys +To map or change which pedal triggers which action, run the configuration script: + +```bash +./scripts/foot_pedal/run_pedal_config.sh +``` + +Follow the prompts in the terminal: +1. When prompted for 'ACTIVATE', press the first pedal. +2. When prompted for 'HOME', press the second pedal. +3. When prompted for 'RECORD', press the third pedal. + +The configuration is saved to `~/.neuracore/foot_pedal.json`. + +### Running the Demo +To start controlling the robot with the pedals, use the demo script: + +```bash +python3 scripts/foot_pedal/pedal_demo.py +``` + +## Developer Usage + +The pedal system is integrated into the Neuracore core library. You can use it in your own scripts as follows: + +```python +from neuracore.core.input_devices.foot_pedal import FootPedal + +pedal = FootPedal() +pedal.on("activate", your_activate_function) +pedal.on("home", your_home_function) +pedal.on("record", your_record_function) + +pedal.start() +``` + +## Troubleshooting +- If pedals are not detected, ensure the terminal window has focus. +- The `readchar` library must be installed for key detection. +- If using a serial-based pedal, use `list_potential_ports()` from `foot_pedal.py` to identify the device path. diff --git a/scripts/foot_pedal/pedal_demo.py b/scripts/foot_pedal/pedal_demo.py new file mode 100755 index 0000000..3fc87c0 --- /dev/null +++ b/scripts/foot_pedal/pedal_demo.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Example of using the Foot Pedal to control the Piper robot.""" + +import sys +import time +from pathlib import Path + +# Add paths +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +import neuracore as nc +from neuracore.core.input_devices.foot_pedal import FootPedal + +from piper_controller import PiperController + + +def main() -> None: + """Run a demonstration of foot pedal control for the Piper robot.""" + print("=" * 60) + print("FOOT PEDAL ROBOT CONTROL DEMO") + print("=" * 60) + + # Initialize robot + print("\nšŸ¤– Initializing Piper robot...") + try: + robot = PiperController(can_interface="can0", debug_mode=False) + robot.start_control_loop() + except Exception as e: + print(f"āœ— Failed to initialize robot: {e}") + print("Running in simulation/mock mode (no hardware)...") + robot = None + + # Initialize Foot Pedal + print("\nāŒØļø Initializing Foot Pedal...") + pedal = FootPedal() + + if not any(pedal.mappings.values()): + print("āš ļø Foot pedal mappings are empty. Please run run_pedal_config.sh first.") + # Default fallback for demo + pedal.mappings = {"activate": "1", "home": "2", "record": "3"} + print(f"Using default demo mappings: {pedal.mappings}") + + # Connect to Neuracore (optional, for recording) + try: + nc.login() + print("āœ“ Connected to Neuracore") + except Exception: + print( + "āš ļø Neuracore connection failed. Recording actions will only print messages." + ) + + # Define callbacks + def on_activate() -> None: + """Handle the activate/enable action.""" + print("\nšŸš€ Foot Pedal: ACTIVATE pressed") + if robot: + if robot.resume_robot(): + print("āœ“ Robot enabled") + else: + print("āœ— Failed to enable robot") + else: + print("Action: Robot ACTIVATE (MOCK)") + + def on_home() -> None: + """Handle the move-to-home action.""" + print("\nšŸ  Foot Pedal: HOME pressed") + if robot: + robot.move_to_home() + print("āœ“ Robot moving to home") + else: + print("Action: Robot HOME (MOCK)") + + def on_record() -> None: + """Handle the toggle-recording action.""" + print("\nšŸ“¹ Foot Pedal: RECORD pressed") + try: + if not nc.is_recording(): + nc.start_recording() + print("āœ“ Recording started") + else: + nc.stop_recording() + print("āœ“ Recording stopped") + except Exception as e: + print(f"āš ļø Recording action failed: {e}") + print("Action: Toggle Recording (MOCK)") + + # Register callbacks + pedal.on("activate", on_activate) + pedal.on("home", on_home) + pedal.on("record", on_record) + + # Start pedal listener + pedal.start() + + print("\nāœ… Setup complete!") + print(f"ACTIVATE: Press '{pedal.mappings['activate']}' to enable robot") + print(f"HOME: Press '{pedal.mappings['home']}' to send robot home") + print(f"RECORD: Press '{pedal.mappings['record']}' to toggle recording") + print("\nāš ļø Press Ctrl+C to exit") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nšŸ‘‹ Stopping...") + finally: + pedal.stop() + if robot: + robot.cleanup() + + +if __name__ == "__main__": + main() diff --git a/scripts/foot_pedal/run_pedal_config.sh b/scripts/foot_pedal/run_pedal_config.sh new file mode 100755 index 0000000..39be813 --- /dev/null +++ b/scripts/foot_pedal/run_pedal_config.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Script to run foot pedal configuration + +# Get the directory where the script is located +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +REPO_ROOT="$( cd "$SCRIPT_DIR/../../.." &> /dev/null && pwd )" + +# Run the python config script +PYTHONPATH="$REPO_ROOT/neuracore:$PYTHONPATH" python3 "$REPO_ROOT/neuracore/neuracore/core/input_devices/pedal_config.py"