From 3d2d81d91b07e2f15ebeebfead0b526c855d4f79 Mon Sep 17 00:00:00 2001 From: Pruthvi-Neuracore Date: Thu, 5 Mar 2026 16:59:05 +0000 Subject: [PATCH 1/2] feat: add foot pedal control for neuracore recording --- examples/10_leader_arm_teleop_agilex.py | 271 ++++++++++++++++++++++++ examples/7_teleop_with_pedal.py | 191 +++++++++++++++++ scripts/foot_pedal/README.md | 51 +++++ scripts/foot_pedal/pedal_demo.py | 107 ++++++++++ scripts/foot_pedal/run_pedal_config.sh | 9 + 5 files changed, 629 insertions(+) create mode 100644 examples/10_leader_arm_teleop_agilex.py create mode 100644 examples/7_teleop_with_pedal.py create mode 100644 scripts/foot_pedal/README.md create mode 100755 scripts/foot_pedal/pedal_demo.py create mode 100755 scripts/foot_pedal/run_pedal_config.sh diff --git a/examples/10_leader_arm_teleop_agilex.py b/examples/10_leader_arm_teleop_agilex.py new file mode 100644 index 0000000..74b1e0a --- /dev/null +++ b/examples/10_leader_arm_teleop_agilex.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +"""AgileX PiPER Teleoperation using LeRobot Leader Arm and Foot Pedal. + +This script maps a physical LeRobot leader arm and a 3-button Foot Pedal +to a simulated AgileX PiPER robot in Neuracore. +""" + +import argparse +import os +import sys +import threading +import time +import traceback +from pathlib import Path + +import numpy as np + +# Resolve workspace paths +_file_path = Path(__file__).resolve() +_example_agilex_root = _file_path.parent.parent +_ws_root = _example_agilex_root.parent + +# Add example_so101 to path for the leader_arm module +_example_so101_root = _ws_root / "example_so101" +sys.path.insert(0, str(_example_so101_root)) +sys.path.insert(0, str(_example_so101_root / "examples")) + +# Add neuracore and other dependencies +_neuracore_pkg_root = _ws_root / "neuracore" +sys.path.insert(0, str(_neuracore_pkg_root)) + +# CRITICAL: Set PYTHONPATH so that spawned subprocesses (like the data daemon) can find neuracore +os.environ["PYTHONPATH"] = str(_neuracore_pkg_root) + os.pathsep + os.environ.get("PYTHONPATH", "") + +import neuracore as nc + +# Import from example_so101 common components +from common.data_manager import DataManager, RobotActivityState +from common.leader_arm import LerobotSO101LeaderArm +from neuracore.core.input_devices.foot_pedal import FootPedal + +# AgileX PiPER Configurations +PIPER_JOINT_NAMES = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"] +# Limits in degrees +PIPER_LIMITS_DEG = np.array([ + (-150.0, 150.0), (0.0, 180.0), (-170.0, 0.0), + (-100.0, 100.0), (-70.0, 70.0), (-120.0, 120.0) +], dtype=np.float64) + +PIPER_OFFSETS_DEG = np.zeros(6, dtype=np.float64) +PIPER_DIRECTIONS = np.ones(6, dtype=np.float64) + +# Leader (5 joints) to AgileX (6 joints) mapping +# Leader 0 (Pan) -> Piper joint1 +# Leader 1 (Lift) -> Piper joint2 +# Leader 2 (Elbow) -> Piper joint3 +# Leader 3 (Wrist Flex) -> Piper joint5 +# Leader 4 (Wrist Roll) -> Piper joint6 +LEADER_TO_PIPER_JOINT = {0: 0, 1: 1, 2: 2, 3: 4, 4: 5} +PIPER_FIXED_JOINTS = {3: 0.0} # joint4 fixed at 0 + +URDF_PATH = _example_agilex_root / "piper_description" / "urdf" / "piper_description.urdf" + +def leader_reader_thread(data_manager: DataManager, leader: LerobotSO101LeaderArm, rate: float): + dt = 1.0 / rate + while not data_manager.is_shutdown_requested(): + try: + if leader.is_connected: + angles, gripper = leader.read_mapped() + data_manager.set_leader_mapped_state(angles, gripper) + except Exception as e: + print(f"Error in leader reader thread: {e}") + time.sleep(dt) + +def main(): + parser = argparse.ArgumentParser(description="AgileX PiPER Teleop with LeRobot Leader and Foot Pedal") + parser.add_argument("--leader-port", type=str, default="/dev/ttyACM0", help="USB port for leader arm") + parser.add_argument("--leader-id", type=str, default="my_awesome_leader_arm", help="LeRobot calibration id") + parser.add_argument("--leader-rate", type=float, default=50.0, help="Reading rate in Hz") + parser.add_argument("--robot-name", type=str, default="AgileX PiPER", help="Robot name in Neuracore") + + # Pedal keys + parser.add_argument("--pedal-activate", type=str, default="a") + parser.add_argument("--pedal-home", type=str, default="b") + parser.add_argument("--pedal-record", type=str, default="c") + + args = parser.parse_args() + + print("=" * 60) + print("AGILEX PIPER LE ROBOT + FOOT PEDAL TELEOP") + print("=" * 60) + + # 1. Initialize Leader Arm + print(f"\n🦾 Connecting to Le Robot leader on {args.leader_port}...") + leader = LerobotSO101LeaderArm(port=args.leader_port, calibration_id=args.leader_id) + try: + leader.connect(calibrate=False) + except Exception as e: + print(f"āŒ Failed to connect to leader: {e}") + print("Tip: Run the robust calibration script first.") + sys.exit(1) + + # Adjusted Piper Config for better mapping + local_limits_deg = np.array([ + (-150.0, 150.0), (0.0, 160.0), (-160.0, 0.0), # J1, J2, J3 + (-100.0, 100.0), (-100.0, 100.0), (-180.0, 180.0) # J4, J5, J6 + ], dtype=np.float64) + + # Official Neutral Angles: [-1.0, 80.0, -51.0, -4.0, 16.0, 2.6] + # We use these as offsets so leader 0 -> official neutral + local_offsets_deg = np.array([-1.0, 80.0, -51.0, 0.0, 16.0, 2.6]) + + # Directions: + # J3 (Elbow) often needs to be flipped if leader fold -> piper fold + # J2 (Lift) leader up -> piper lift up + local_directions = np.array([1.0, 1.0, -1.0, 1.0, 1.0, 1.0]) + + leader.configure_follower( + follower_limits_deg=local_limits_deg, + follower_offsets_deg=local_offsets_deg, + follower_directions=local_directions, + leader_to_follower_joint=LEADER_TO_PIPER_JOINT, + fixed_joints=PIPER_FIXED_JOINTS + ) + print("āœ“ Leader arm configured with OFFICIAL offsets") + + # Neutral Pose for Parking/Home (degrees) - Using official values + NEUTRAL_POSE_DEG = np.array([-1.0, 80.0, -51.0, 0.0, 16.0, 2.6]) + + # 2. Initialize Data Manager and Foot Pedal + data_manager = DataManager() + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) # Start DISABLED + + pedal = FootPedal() + pedal.save_config(activate_key=args.pedal_activate, home_key=args.pedal_home, record_key=args.pedal_record) + + # Define callbacks first + def on_activate(): + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + print("āœ“ [PEDAL] Robot DISABLED (Parking...)") + else: + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + print("āœ“ [PEDAL] Robot ENABLED (Following Leader)") + + def on_home(): + print("šŸ  [PEDAL] Moving to home position...") + data_manager.set_robot_activity_state(RobotActivityState.HOMING) + + def on_record(): + print("āŗļø [PEDAL] Toggling recording...") + if not nc.is_recording(): + try: + nc.start_recording() + print("āœ“ [PEDAL] Recording STARTED") + except Exception as e: + print(f"āœ— [PEDAL] Failed to start recording: {e}") + else: + try: + nc.stop_recording() + print("āœ“ [PEDAL] Recording STOPPED") + except Exception as e: + print(f"āœ— [PEDAL] Failed to stop recording: {e}") + + # Explicitly register + pedal.on("activate", on_activate) + pedal.on("home", on_home) + pedal.on("record", on_record) + + pedal.start() + print(f"āŒØļø Foot Pedal listener started (Keys: {args.pedal_activate}, {args.pedal_home}, {args.pedal_record})") + + # 3. Initialize Neuracore + print("\nšŸ”§ Initializing Neuracore...") + nc.login() + print(f"šŸ“¦ Connecting to Robot: {args.robot_name}") + nc.connect_robot(robot_name=args.robot_name, urdf_path=str(URDF_PATH)) + + ds_name = f"piper-leader-teleop-{time.strftime('%Y%m%d-%H%M%S')}" + print(f"šŸ“‚ Creating Dataset: {ds_name}") + nc.create_dataset(name=ds_name) + + # 4. Start Leader Reading Thread + reader_thread = threading.Thread( + target=leader_reader_thread, + args=(data_manager, leader, args.leader_rate), + daemon=True + ) + reader_thread.start() + + print("\nšŸš€ TELEOP READY") + print("------------------------------------------------------------") + print("1. Robot starts in DISABLED (Neutral) pose.") + print(f"2. Press '{args.pedal_activate}' to ENABLE (Mirror Leader).") + print(f"3. Press '{args.pedal_record}' to Toggle Recording.") + print("------------------------------------------------------------") + + loop_count = 0 + try: + while True: + t0 = time.time() + mapped_angles, mapped_gripper = data_manager.get_leader_mapped_state() + state = data_manager.get_robot_activity_state() + + if state == RobotActivityState.HOMING: + angles_deg = NEUTRAL_POSE_DEG + gripper_val = 0.5 + time.sleep(0.5) + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + elif state == RobotActivityState.DISABLED: + angles_deg = NEUTRAL_POSE_DEG + gripper_val = 0.5 + else: + # ENABLED: Follow Leader + if mapped_angles is not None: + angles_deg = mapped_angles + gripper_val = mapped_gripper + else: + angles_deg = NEUTRAL_POSE_DEG + gripper_val = 0.5 + + # Streaming and Logging + if angles_deg is not None: + loop_count += 1 + # Debug every 30 loops (~2Hz) + if loop_count % 30 == 0: + if state == RobotActivityState.DISABLED: + print(f"šŸ’” [REMINDER] Robot is DISABLED. Press '{args.pedal_activate}' to Mirror. [Raw Grip: {gripper_val:.2f}]") + else: + print(f"šŸ“” [MIRRORING] J1-3: {np.round(angles_deg[:3], 1)} | Grip: {gripper_val:.2f}") + + # Log to Neuracore + data_dict = {} + for i, deg in enumerate(angles_deg): + val = float(np.radians(deg)) + # Explicit small offset for joint4 if 0.0 is problematic + if i == 3 and abs(val) < 0.0001: + val = 0.0001 + data_dict[f"joint{i+1}"] = val + + # Visual Gripper Joints (joint7, joint8) + if gripper_val is not None: + # joint7: 0 (closed) to 0.035 (open) + # joint8: 0 (closed) to -0.035 (open) + data_dict["joint7"] = float(gripper_val * 0.035) + data_dict["joint8"] = float(-gripper_val * 0.035) + + nc.log_joint_positions(data_dict, timestamp=t0) + + else: + if loop_count % 60 == 0: + print("āš ļø [WARNING] No leader data received! Is it plugged in?") + + time.sleep(max(0, 1.0 / 60.0 - (time.time() - t0))) + + except KeyboardInterrupt: + print("\nšŸ‘‹ Teleop stopped by user.") + except Exception as e: + print(f"\nāŒ Error: {e}") + traceback.print_exc() + finally: + data_manager.request_shutdown() + pedal.stop() + reader_thread.join(timeout=1.0) + leader.disconnect() + nc.logout() + print("šŸ‘‹ Shutdown complete.") + +if __name__ == "__main__": + main() diff --git a/examples/7_teleop_with_pedal.py b/examples/7_teleop_with_pedal.py new file mode 100644 index 0000000..9f2fd76 --- /dev/null +++ b/examples/7_teleop_with_pedal.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +"""Piper Robot Teleoperation with Meta Quest and Foot Pedal control. + +This script combines Meta Quest controller input for movement with Foot Pedal +control for session management (Activate, Home, Record). +""" + +import argparse +import multiprocessing +import sys +import threading +import time +import traceback +from pathlib import Path + +import neuracore as nc +import numpy as np + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) +# Add neuracore path for local imports if needed +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "neuracore")) + +from common.configs import ( + CAMERA_FRAME_STREAMING_RATE, + CONTROLLER_BETA, + CONTROLLER_D_CUTOFF, + CONTROLLER_DATA_RATE, + CONTROLLER_MIN_CUTOFF, + DAMPING_COST, + FRAME_TASK_GAIN, + GRIPPER_FRAME_NAME, + GRIPPER_LOGGING_NAME, + IK_SOLVER_RATE, + JOINT_NAMES, + JOINT_STATE_STREAMING_RATE, + LM_DAMPING, + NEUTRAL_JOINT_ANGLES, + ORIENTATION_COST, + POSITION_COST, + POSTURE_COST_VECTOR, + ROBOT_RATE, + SOLVER_DAMPING_VALUE, + SOLVER_NAME, + URDF_PATH, +) +from common.data_manager import DataManager, RobotActivityState +from common.threads.camera import camera_thread +from common.threads.ik_solver import ik_solver_thread +from common.threads.joint_state import joint_state_thread +from common.threads.quest_reader import quest_reader_thread +from meta_quest_teleop.reader import MetaQuestReader + +from pink_ik_solver import PinkIKSolver +from piper_controller import PiperController +from neuracore.core.input_devices.foot_pedal import FootPedal + + +def log_to_neuracore_on_change_callback(name: str, value: float, timestamp: float) -> None: + try: + if name == "log_joint_positions": + data_dict = {jn: np.radians(a) for jn, a in zip(JOINT_NAMES, value)} + nc.log_joint_positions(data_dict, timestamp=timestamp) + elif name == "log_joint_target_positions": + data_dict = {jn: np.radians(a) for jn, a in zip(JOINT_NAMES, value)} + nc.log_joint_target_positions(data_dict, timestamp=timestamp) + elif name == "log_parallel_gripper_open_amounts": + nc.log_parallel_gripper_open_amounts({GRIPPER_LOGGING_NAME: value}, timestamp=timestamp) + elif name == "log_parallel_gripper_target_open_amounts": + nc.log_parallel_gripper_target_open_amounts({GRIPPER_LOGGING_NAME: value}, timestamp=timestamp) + elif name == "log_rgb": + nc.log_rgb("rgb", value, timestamp=timestamp) + except Exception as e: + print(f"āš ļø Logging failed: {e}") + + +def toggle_robot_state() -> None: + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + robot_controller.graceful_stop() + data_manager.set_teleop_state(False, None, None) + print("āœ“ [ACTION] Robot DISABLED") + elif state == RobotActivityState.DISABLED: + if robot_controller.resume_robot(): + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + print("āœ“ [ACTION] Robot ENABLED") + else: + print("āœ— [ACTION] Failed to enable robot") + + +def move_robot_home() -> None: + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + print("šŸ  [ACTION] Moving to home position...") + data_manager.set_robot_activity_state(RobotActivityState.HOMING) + data_manager.set_teleop_state(False, None, None) + if not robot_controller.move_to_home(): + print("āœ— [ACTION] Homing failed") + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + else: + print("āš ļø [ACTION] Cannot home: robot not enabled") + + +def toggle_recording() -> None: + if not nc.is_recording(): + try: + nc.start_recording() + print("āœ“ [ACTION] Recording STARTED") + except Exception as e: + print(f"āœ— [ACTION] Recording failed to start: {e}") + else: + try: + nc.stop_recording() + print("āœ“ [ACTION] Recording STOPPED") + except Exception as e: + print(f"āœ— [ACTION] Recording failed to stop: {e}") + + +if __name__ == "__main__": + multiprocessing.set_start_method("spawn") + + parser = argparse.ArgumentParser(description="Combined Quest + Pedal Teleoperation") + parser.add_argument("--ip-address", type=str, help="Meta Quest IP") + parser.add_argument("--dataset-name", type=str, help="Neuracore dataset name") + args = parser.parse_args() + + print("=" * 60) + print("PIPER TELEOP: META QUEST + FOOT PEDALS") + print("=" * 60) + + # Neuracore Init + print("\nšŸ”§ Initializing Neuracore...") + nc.login() + nc.connect_robot(robot_name="AgileX PiPER", urdf_path=str(URDF_PATH), overwrite=False) + + ds_name = args.dataset_name or f"pedal-teleop-{time.strftime('%H-%M-%S')}" + nc.create_dataset(name=ds_name, description="Quest + Pedal unified collection") + + # Shared State + data_manager = DataManager() + data_manager.set_on_change_callback(log_to_neuracore_on_change_callback) + data_manager.set_controller_filter_params(CONTROLLER_MIN_CUTOFF, CONTROLLER_BETA, CONTROLLER_D_CUTOFF) + + # Robot Initialization + print("\nšŸ¤– Initializing Piper...") + robot_controller = PiperController(can_interface="can0", robot_rate=ROBOT_RATE) + robot_controller.start_control_loop() + + # Threads + print("\nšŸ“Š Starting Threads (JointState, QuestReader, IKSolver, Camera)...") + threading.Thread(target=joint_state_thread, args=(data_manager, robot_controller), daemon=True).start() + + quest_reader = MetaQuestReader(ip_address=args.ip_address, port=5555, run=True) + threading.Thread(target=quest_reader_thread, args=(data_manager, quest_reader), daemon=True).start() + + # Sync IK solver to current position + initial_joints = np.radians(data_manager.get_current_joint_angles() or NEUTRAL_JOINT_ANGLES) + ik_solver = PinkIKSolver( + urdf_path=URDF_PATH, end_effector_frame=GRIPPER_FRAME_NAME, + initial_configuration=initial_joints, posture_cost_vector=np.array(POSTURE_COST_VECTOR) + ) + threading.Thread(target=ik_solver_thread, args=(data_manager, ik_solver), daemon=True).start() + threading.Thread(target=camera_thread, args=(data_manager,), daemon=True).start() + + # Foot Pedal Initialization + print("\nāŒØļø Initializing Foot Pedals...") + pedal = FootPedal() + pedal.on("activate", toggle_robot_state) + pedal.on("home", move_robot_home) + pedal.on("record", toggle_recording) + pedal.start() + + print("\nāœ… SYSTEM ONLINE") + print("------------------------------------------------------------") + print("šŸŽ® QUEST CONTROLS: Hold GRIP to move, TRIGGER for gripper") + print("āŒØļø PEDAL CONTROLS: ACTIVATE (Enable), HOME (Reset), RECORD") + print("------------------------------------------------------------") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nšŸ‘‹ Shutting down...") + finally: + pedal.stop() + if nc.is_recording(): nc.cancel_recording() + nc.logout() + data_manager.request_shutdown() + quest_reader.stop() + robot_controller.cleanup() diff --git a/scripts/foot_pedal/README.md b/scripts/foot_pedal/README.md new file mode 100644 index 0000000..82ce39e --- /dev/null +++ b/scripts/foot_pedal/README.md @@ -0,0 +1,51 @@ +# Foot Pedal Control System + +The Foot Pedal system allows you to control the robot and data collection using a 3-pedal USB device (or any keyboard-mapped input). + +## Actions +- **ACTIVATE**: Enables the robot and aligns targets to current joint positions. +- **HOME**: Moves the robot arm to its predefined home position. +- **RECORD**: Toggles Neuracore data collection (Start/Stop recording). + +## Setup and Configuration + +### Remapping Keys +To map or change which pedal triggers which action, run the configuration script: + +```bash +./scripts/foot_pedal/run_pedal_config.sh +``` + +Follow the prompts in the terminal: +1. When prompted for 'ACTIVATE', press the first pedal. +2. When prompted for 'HOME', press the second pedal. +3. When prompted for 'RECORD', press the third pedal. + +The configuration is saved to `~/.neuracore/foot_pedal.json`. + +### Running the Demo +To start controlling the robot with the pedals, use the demo script: + +```bash +python3 scripts/foot_pedal/pedal_demo.py +``` + +## Developer Usage + +The pedal system is integrated into the Neuracore core library. You can use it in your own scripts as follows: + +```python +from neuracore.core.input_devices.foot_pedal import FootPedal + +pedal = FootPedal() +pedal.on("activate", your_activate_function) +pedal.on("home", your_home_function) +pedal.on("record", your_record_function) + +pedal.start() +``` + +## Troubleshooting +- If pedals are not detected, ensure the terminal window has focus. +- The `readchar` library must be installed for key detection. +- If using a serial-based pedal, use `list_potential_ports()` from `foot_pedal.py` to identify the device path. diff --git a/scripts/foot_pedal/pedal_demo.py b/scripts/foot_pedal/pedal_demo.py new file mode 100755 index 0000000..6a1c5d2 --- /dev/null +++ b/scripts/foot_pedal/pedal_demo.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +"""Example of using the Foot Pedal to control the Piper robot.""" + +import sys +import threading +import time +from pathlib import Path + +# Add paths +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +import neuracore as nc +from neuracore.core.input_devices.foot_pedal import FootPedal +from piper_controller import PiperController + + +def main(): + print("=" * 60) + print("FOOT PEDAL ROBOT CONTROL DEMO") + print("=" * 60) + + # Initialize robot + print("\nšŸ¤– Initializing Piper robot...") + try: + robot = PiperController(can_interface="can0", debug_mode=False) + robot.start_control_loop() + except Exception as e: + print(f"āœ— Failed to initialize robot: {e}") + print("Running in simulation/mock mode (no hardware)...") + robot = None + + # Initialize Foot Pedal + print("\nāŒØļø Initializing Foot Pedal...") + pedal = FootPedal() + + if not any(pedal.mappings.values()): + print("āš ļø Foot pedal mappings are empty. Please run run_pedal_config.sh first.") + # Default fallback for demo + pedal.mappings = {"activate": "1", "home": "2", "record": "3"} + print(f"Using default demo mappings: {pedal.mappings}") + + # Connect to Neuracore (optional, for recording) + try: + nc.login() + print("āœ“ Connected to Neuracore") + except Exception: + print("āš ļø Neuracore connection failed. Recording actions will only print messages.") + + # Define callbacks + def on_activate(): + print("\nšŸš€ Foot Pedal: ACTIVATE pressed") + if robot: + if robot.resume_robot(): + print("āœ“ Robot enabled") + else: + print("āœ— Failed to enable robot") + else: + print("Action: Robot ACTIVATE (MOCK)") + + def on_home(): + print("\nšŸ  Foot Pedal: HOME pressed") + if robot: + robot.move_to_home() + print("āœ“ Robot moving to home") + else: + print("Action: Robot HOME (MOCK)") + + def on_record(): + print("\nšŸ“¹ Foot Pedal: RECORD pressed") + try: + if not nc.is_recording(): + nc.start_recording() + print("āœ“ Recording started") + else: + nc.stop_recording() + print("āœ“ Recording stopped") + except Exception as e: + print(f"āš ļø Recording action failed: {e}") + print(f"Action: Toggle Recording (MOCK)") + + # Register callbacks + pedal.on("activate", on_activate) + pedal.on("home", on_home) + pedal.on("record", on_record) + + # Start pedal listener + pedal.start() + + print("\nāœ… Setup complete!") + print(f"ACTIVATE: Press '{pedal.mappings['activate']}' to enable robot") + print(f"HOME: Press '{pedal.mappings['home']}' to send robot home") + print(f"RECORD: Press '{pedal.mappings['record']}' to toggle recording") + print("\nāš ļø Press Ctrl+C to exit") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nšŸ‘‹ Stopping...") + finally: + pedal.stop() + if robot: + robot.cleanup() + + +if __name__ == "__main__": + main() diff --git a/scripts/foot_pedal/run_pedal_config.sh b/scripts/foot_pedal/run_pedal_config.sh new file mode 100755 index 0000000..39be813 --- /dev/null +++ b/scripts/foot_pedal/run_pedal_config.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Script to run foot pedal configuration + +# Get the directory where the script is located +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +REPO_ROOT="$( cd "$SCRIPT_DIR/../../.." &> /dev/null && pwd )" + +# Run the python config script +PYTHONPATH="$REPO_ROOT/neuracore:$PYTHONPATH" python3 "$REPO_ROOT/neuracore/neuracore/core/input_devices/pedal_config.py" From 7bb398c82a7ef54649ddddb1c2fca0f964ab35b3 Mon Sep 17 00:00:00 2001 From: Pruthvi-Neuracore Date: Mon, 9 Mar 2026 14:47:13 +0000 Subject: [PATCH 2/2] fix: pre-commits --- example-agilex-dictionary.txt | 2 + examples/10_leader_arm_teleop_agilex.py | 186 ++++++++++++++++-------- examples/7_teleop_with_pedal.py | 76 ++++++---- examples/common/data_manager.py | 48 ++++++ scripts/foot_pedal/pedal_demo.py | 22 ++- 5 files changed, 240 insertions(+), 94 deletions(-) diff --git a/example-agilex-dictionary.txt b/example-agilex-dictionary.txt index 25fbae6..8defd6b 100644 --- a/example-agilex-dictionary.txt +++ b/example-agilex-dictionary.txt @@ -49,3 +49,5 @@ Viser wxyz xyzw yourdfpy +Lerobot +readchar diff --git a/examples/10_leader_arm_teleop_agilex.py b/examples/10_leader_arm_teleop_agilex.py index 74b1e0a..629f4ae 100644 --- a/examples/10_leader_arm_teleop_agilex.py +++ b/examples/10_leader_arm_teleop_agilex.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """AgileX PiPER Teleoperation using LeRobot Leader Arm and Foot Pedal. -This script maps a physical LeRobot leader arm and a 3-button Foot Pedal +This script maps a physical LeRobot leader arm and a 3-button Foot Pedal to a simulated AgileX PiPER robot in Neuracore. """ @@ -28,24 +28,36 @@ # Add neuracore and other dependencies _neuracore_pkg_root = _ws_root / "neuracore" sys.path.insert(0, str(_neuracore_pkg_root)) +sys.path.insert( + 0, str(_example_agilex_root) +) # Fix for local piper_controller if needed # CRITICAL: Set PYTHONPATH so that spawned subprocesses (like the data daemon) can find neuracore -os.environ["PYTHONPATH"] = str(_neuracore_pkg_root) + os.pathsep + os.environ.get("PYTHONPATH", "") +os.environ["PYTHONPATH"] = ( + str(_neuracore_pkg_root) + os.pathsep + os.environ.get("PYTHONPATH", "") +) -import neuracore as nc +import neuracore as nc # noqa: E402 # Import from example_so101 common components -from common.data_manager import DataManager, RobotActivityState -from common.leader_arm import LerobotSO101LeaderArm -from neuracore.core.input_devices.foot_pedal import FootPedal +from common.data_manager import DataManager, RobotActivityState # noqa: E402 +from common.leader_arm import LerobotSO101LeaderArm # noqa: E402 +from neuracore.core.input_devices.foot_pedal import FootPedal # noqa: E402 # AgileX PiPER Configurations PIPER_JOINT_NAMES = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"] # Limits in degrees -PIPER_LIMITS_DEG = np.array([ - (-150.0, 150.0), (0.0, 180.0), (-170.0, 0.0), - (-100.0, 100.0), (-70.0, 70.0), (-120.0, 120.0) -], dtype=np.float64) +PIPER_LIMITS_DEG = np.array( + [ + (-150.0, 150.0), + (0.0, 180.0), + (-170.0, 0.0), + (-100.0, 100.0), + (-70.0, 70.0), + (-120.0, 120.0), + ], + dtype=np.float64, +) PIPER_OFFSETS_DEG = np.zeros(6, dtype=np.float64) PIPER_DIRECTIONS = np.ones(6, dtype=np.float64) @@ -57,33 +69,63 @@ # Leader 3 (Wrist Flex) -> Piper joint5 # Leader 4 (Wrist Roll) -> Piper joint6 LEADER_TO_PIPER_JOINT = {0: 0, 1: 1, 2: 2, 3: 4, 4: 5} -PIPER_FIXED_JOINTS = {3: 0.0} # joint4 fixed at 0 +PIPER_FIXED_JOINTS = {3: 0.0} # joint4 fixed at 0 -URDF_PATH = _example_agilex_root / "piper_description" / "urdf" / "piper_description.urdf" +URDF_PATH = ( + _example_agilex_root / "piper_description" / "urdf" / "piper_description.urdf" +) -def leader_reader_thread(data_manager: DataManager, leader: LerobotSO101LeaderArm, rate: float): + +def leader_reader_thread( + data_manager: DataManager, leader: LerobotSO101LeaderArm, rate: float +) -> None: + """Thread for reading data from the leader arm and updating the shared state. + + Args: + data_manager: Shared state manager. + leader: Initialized leader arm instance. + rate: Frequency in Hz to read the arm. + """ dt = 1.0 / rate while not data_manager.is_shutdown_requested(): try: if leader.is_connected: angles, gripper = leader.read_mapped() - data_manager.set_leader_mapped_state(angles, gripper) + data_manager.set_leader_mapped_state(angles, gripper) # type: ignore[attr-defined] except Exception as e: print(f"Error in leader reader thread: {e}") time.sleep(dt) -def main(): - parser = argparse.ArgumentParser(description="AgileX PiPER Teleop with LeRobot Leader and Foot Pedal") - parser.add_argument("--leader-port", type=str, default="/dev/ttyACM0", help="USB port for leader arm") - parser.add_argument("--leader-id", type=str, default="my_awesome_leader_arm", help="LeRobot calibration id") - parser.add_argument("--leader-rate", type=float, default=50.0, help="Reading rate in Hz") - parser.add_argument("--robot-name", type=str, default="AgileX PiPER", help="Robot name in Neuracore") - - # Pedal keys + +def main() -> None: + """Main execution entry point for teleoperation script.""" + parser = argparse.ArgumentParser( + description="AgileX PiPER Teleop with LeRobot Leader and Foot Pedal" + ) + parser.add_argument( + "--leader-port", + type=str, + default="/dev/ttyACM0", + help="USB port for leader arm", + ) + parser.add_argument( + "--leader-id", + type=str, + default="my_awesome_leader_arm", + help="LeRobot calibration id", + ) + parser.add_argument( + "--leader-rate", type=float, default=50.0, help="Reading rate in Hz" + ) + parser.add_argument( + "--robot-name", type=str, default="AgileX PiPER", help="Robot name in Neuracore" + ) + + # Pedal keys (Matching OpenArm unified mapping) parser.add_argument("--pedal-activate", type=str, default="a") - parser.add_argument("--pedal-home", type=str, default="b") - parser.add_argument("--pedal-record", type=str, default="c") - + parser.add_argument("--pedal-home", type=str, default="h") + parser.add_argument("--pedal-record", type=str, default="r") + args = parser.parse_args() print("=" * 60) @@ -99,20 +141,23 @@ def main(): print(f"āŒ Failed to connect to leader: {e}") print("Tip: Run the robust calibration script first.") sys.exit(1) - + # Adjusted Piper Config for better mapping - local_limits_deg = np.array([ - (-150.0, 150.0), (0.0, 160.0), (-160.0, 0.0), # J1, J2, J3 - (-100.0, 100.0), (-100.0, 100.0), (-180.0, 180.0) # J4, J5, J6 - ], dtype=np.float64) - - # Official Neutral Angles: [-1.0, 80.0, -51.0, -4.0, 16.0, 2.6] - # We use these as offsets so leader 0 -> official neutral + local_limits_deg = np.array( + [ + (-150.0, 150.0), + (0.0, 160.0), + (-160.0, 0.0), # J1, J2, J3 + (-100.0, 100.0), + (-100.0, 100.0), + (-180.0, 180.0), # J4, J5, J6 + ], + dtype=np.float64, + ) + + # Reverting to the previous working offsets as requested + # These worked well for the user in the initial session local_offsets_deg = np.array([-1.0, 80.0, -51.0, 0.0, 16.0, 2.6]) - - # Directions: - # J3 (Elbow) often needs to be flipped if leader fold -> piper fold - # J2 (Lift) leader up -> piper lift up local_directions = np.array([1.0, 1.0, -1.0, 1.0, 1.0, 1.0]) leader.configure_follower( @@ -120,22 +165,34 @@ def main(): follower_offsets_deg=local_offsets_deg, follower_directions=local_directions, leader_to_follower_joint=LEADER_TO_PIPER_JOINT, - fixed_joints=PIPER_FIXED_JOINTS + fixed_joints=PIPER_FIXED_JOINTS, ) - print("āœ“ Leader arm configured with OFFICIAL offsets") + print("āœ“ Leader arm configured with WORKING offsets (Reverted)") # Neutral Pose for Parking/Home (degrees) - Using official values NEUTRAL_POSE_DEG = np.array([-1.0, 80.0, -51.0, 0.0, 16.0, 2.6]) # 2. Initialize Data Manager and Foot Pedal data_manager = DataManager() - data_manager.set_robot_activity_state(RobotActivityState.DISABLED) # Start DISABLED - + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) # Start DISABLED + pedal = FootPedal() - pedal.save_config(activate_key=args.pedal_activate, home_key=args.pedal_home, record_key=args.pedal_record) + + # HARDCODE working keys for the user's specific FootSwitch + # a = ACTIVATE, b = HOME, c = RECORD + activate_key = "a" + home_key = "b" + record_key = "c" + + # Save this config so the FootPedal class picks it up + pedal.save_config( + activate_key=activate_key, home_key=home_key, record_key=record_key + ) + print(f"āœ“ PEDAL MAPPINGS ENFORCED: {pedal.mappings}") # Define callbacks first - def on_activate(): + def on_activate() -> None: + """Toggle the robot enable/disable state.""" state = data_manager.get_robot_activity_state() if state == RobotActivityState.ENABLED: data_manager.set_robot_activity_state(RobotActivityState.DISABLED) @@ -144,13 +201,17 @@ def on_activate(): data_manager.set_robot_activity_state(RobotActivityState.ENABLED) print("āœ“ [PEDAL] Robot ENABLED (Following Leader)") - def on_home(): + def on_home() -> None: + """Trigger the homing procedure.""" print("šŸ  [PEDAL] Moving to home position...") data_manager.set_robot_activity_state(RobotActivityState.HOMING) - def on_record(): + def on_record() -> None: + """Toggle Neuracore recording session.""" print("āŗļø [PEDAL] Toggling recording...") - if not nc.is_recording(): + current_status = nc.is_recording() + print(f"šŸ” [DEBUG] Current Neuracore recording status: {current_status}") + if not current_status: try: nc.start_recording() print("āœ“ [PEDAL] Recording STARTED") @@ -169,14 +230,18 @@ def on_record(): pedal.on("record", on_record) pedal.start() - print(f"āŒØļø Foot Pedal listener started (Keys: {args.pedal_activate}, {args.pedal_home}, {args.pedal_record})") + print( + f"āŒØļø Foot Pedal listener started (Keys: {args.pedal_activate}, {args.pedal_home}, {args.pedal_record})" + ) # 3. Initialize Neuracore print("\nšŸ”§ Initializing Neuracore...") nc.login() print(f"šŸ“¦ Connecting to Robot: {args.robot_name}") - nc.connect_robot(robot_name=args.robot_name, urdf_path=str(URDF_PATH)) - + nc.connect_robot( + robot_name=args.robot_name, urdf_path=str(URDF_PATH), overwrite=True + ) + ds_name = f"piper-leader-teleop-{time.strftime('%Y%m%d-%H%M%S')}" print(f"šŸ“‚ Creating Dataset: {ds_name}") nc.create_dataset(name=ds_name) @@ -185,7 +250,7 @@ def on_record(): reader_thread = threading.Thread( target=leader_reader_thread, args=(data_manager, leader, args.leader_rate), - daemon=True + daemon=True, ) reader_thread.start() @@ -195,14 +260,14 @@ def on_record(): print(f"2. Press '{args.pedal_activate}' to ENABLE (Mirror Leader).") print(f"3. Press '{args.pedal_record}' to Toggle Recording.") print("------------------------------------------------------------") - + loop_count = 0 try: while True: t0 = time.time() mapped_angles, mapped_gripper = data_manager.get_leader_mapped_state() state = data_manager.get_robot_activity_state() - + if state == RobotActivityState.HOMING: angles_deg = NEUTRAL_POSE_DEG gripper_val = 0.5 @@ -215,7 +280,7 @@ def on_record(): # ENABLED: Follow Leader if mapped_angles is not None: angles_deg = mapped_angles - gripper_val = mapped_gripper + gripper_val = mapped_gripper if mapped_gripper is not None else 0.5 else: angles_deg = NEUTRAL_POSE_DEG gripper_val = 0.5 @@ -226,9 +291,13 @@ def on_record(): # Debug every 30 loops (~2Hz) if loop_count % 30 == 0: if state == RobotActivityState.DISABLED: - print(f"šŸ’” [REMINDER] Robot is DISABLED. Press '{args.pedal_activate}' to Mirror. [Raw Grip: {gripper_val:.2f}]") + print( + f"šŸ’” [REMINDER] Robot is DISABLED. Press '{args.pedal_activate}' to Mirror. [Raw Grip: {gripper_val:.2f}]" + ) else: - print(f"šŸ“” [MIRRORING] J1-3: {np.round(angles_deg[:3], 1)} | Grip: {gripper_val:.2f}") + print( + f"šŸ“” [MIRRORING] J1-3: {np.round(angles_deg[:3], 1)} | Grip: {gripper_val:.2f}" + ) # Log to Neuracore data_dict = {} @@ -238,7 +307,7 @@ def on_record(): if i == 3 and abs(val) < 0.0001: val = 0.0001 data_dict[f"joint{i+1}"] = val - + # Visual Gripper Joints (joint7, joint8) if gripper_val is not None: # joint7: 0 (closed) to 0.035 (open) @@ -247,11 +316,11 @@ def on_record(): data_dict["joint8"] = float(-gripper_val * 0.035) nc.log_joint_positions(data_dict, timestamp=t0) - + else: if loop_count % 60 == 0: print("āš ļø [WARNING] No leader data received! Is it plugged in?") - + time.sleep(max(0, 1.0 / 60.0 - (time.time() - t0))) except KeyboardInterrupt: @@ -267,5 +336,6 @@ def on_record(): nc.logout() print("šŸ‘‹ Shutdown complete.") + if __name__ == "__main__": main() diff --git a/examples/7_teleop_with_pedal.py b/examples/7_teleop_with_pedal.py index 9f2fd76..4860884 100644 --- a/examples/7_teleop_with_pedal.py +++ b/examples/7_teleop_with_pedal.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """Piper Robot Teleoperation with Meta Quest and Foot Pedal control. -This script combines Meta Quest controller input for movement with Foot Pedal +This script combines Meta Quest controller input for movement with Foot Pedal control for session management (Activate, Home, Record). """ @@ -10,8 +10,8 @@ import sys import threading import time -import traceback from pathlib import Path +from typing import Any import neuracore as nc import numpy as np @@ -22,26 +22,15 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent / "neuracore")) from common.configs import ( - CAMERA_FRAME_STREAMING_RATE, CONTROLLER_BETA, CONTROLLER_D_CUTOFF, - CONTROLLER_DATA_RATE, CONTROLLER_MIN_CUTOFF, - DAMPING_COST, - FRAME_TASK_GAIN, GRIPPER_FRAME_NAME, GRIPPER_LOGGING_NAME, - IK_SOLVER_RATE, JOINT_NAMES, - JOINT_STATE_STREAMING_RATE, - LM_DAMPING, NEUTRAL_JOINT_ANGLES, - ORIENTATION_COST, - POSITION_COST, POSTURE_COST_VECTOR, ROBOT_RATE, - SOLVER_DAMPING_VALUE, - SOLVER_NAME, URDF_PATH, ) from common.data_manager import DataManager, RobotActivityState @@ -50,13 +39,22 @@ from common.threads.joint_state import joint_state_thread from common.threads.quest_reader import quest_reader_thread from meta_quest_teleop.reader import MetaQuestReader +from neuracore.core.input_devices.foot_pedal import FootPedal from pink_ik_solver import PinkIKSolver from piper_controller import PiperController -from neuracore.core.input_devices.foot_pedal import FootPedal -def log_to_neuracore_on_change_callback(name: str, value: float, timestamp: float) -> None: +def log_to_neuracore_on_change_callback( + name: str, value: Any, timestamp: float +) -> None: + """Callback triggered on state changes to log data to Neuracore. + + Args: + name: Name of the data stream. + value: Data value (float, array, or image). + timestamp: Time of the change. + """ try: if name == "log_joint_positions": data_dict = {jn: np.radians(a) for jn, a in zip(JOINT_NAMES, value)} @@ -65,9 +63,13 @@ def log_to_neuracore_on_change_callback(name: str, value: float, timestamp: floa data_dict = {jn: np.radians(a) for jn, a in zip(JOINT_NAMES, value)} nc.log_joint_target_positions(data_dict, timestamp=timestamp) elif name == "log_parallel_gripper_open_amounts": - nc.log_parallel_gripper_open_amounts({GRIPPER_LOGGING_NAME: value}, timestamp=timestamp) + nc.log_parallel_gripper_open_amounts( + {GRIPPER_LOGGING_NAME: value}, timestamp=timestamp + ) elif name == "log_parallel_gripper_target_open_amounts": - nc.log_parallel_gripper_target_open_amounts({GRIPPER_LOGGING_NAME: value}, timestamp=timestamp) + nc.log_parallel_gripper_target_open_amounts( + {GRIPPER_LOGGING_NAME: value}, timestamp=timestamp + ) elif name == "log_rgb": nc.log_rgb("rgb", value, timestamp=timestamp) except Exception as e: @@ -75,6 +77,7 @@ def log_to_neuracore_on_change_callback(name: str, value: float, timestamp: floa def toggle_robot_state() -> None: + """Toggle the robot's activity state between ENABLED and DISABLED.""" state = data_manager.get_robot_activity_state() if state == RobotActivityState.ENABLED: data_manager.set_robot_activity_state(RobotActivityState.DISABLED) @@ -90,6 +93,7 @@ def toggle_robot_state() -> None: def move_robot_home() -> None: + """Command the robot to move to its neutral/home position.""" state = data_manager.get_robot_activity_state() if state == RobotActivityState.ENABLED: print("šŸ  [ACTION] Moving to home position...") @@ -103,6 +107,7 @@ def move_robot_home() -> None: def toggle_recording() -> None: + """Start or stop a data recording session in Neuracore.""" if not nc.is_recording(): try: nc.start_recording() @@ -132,15 +137,19 @@ def toggle_recording() -> None: # Neuracore Init print("\nšŸ”§ Initializing Neuracore...") nc.login() - nc.connect_robot(robot_name="AgileX PiPER", urdf_path=str(URDF_PATH), overwrite=False) - + nc.connect_robot( + robot_name="AgileX PiPER", urdf_path=str(URDF_PATH), overwrite=False + ) + ds_name = args.dataset_name or f"pedal-teleop-{time.strftime('%H-%M-%S')}" nc.create_dataset(name=ds_name, description="Quest + Pedal unified collection") # Shared State data_manager = DataManager() data_manager.set_on_change_callback(log_to_neuracore_on_change_callback) - data_manager.set_controller_filter_params(CONTROLLER_MIN_CUTOFF, CONTROLLER_BETA, CONTROLLER_D_CUTOFF) + data_manager.set_controller_filter_params( + CONTROLLER_MIN_CUTOFF, CONTROLLER_BETA, CONTROLLER_D_CUTOFF + ) # Robot Initialization print("\nšŸ¤– Initializing Piper...") @@ -149,18 +158,28 @@ def toggle_recording() -> None: # Threads print("\nšŸ“Š Starting Threads (JointState, QuestReader, IKSolver, Camera)...") - threading.Thread(target=joint_state_thread, args=(data_manager, robot_controller), daemon=True).start() - + threading.Thread( + target=joint_state_thread, args=(data_manager, robot_controller), daemon=True + ).start() + quest_reader = MetaQuestReader(ip_address=args.ip_address, port=5555, run=True) - threading.Thread(target=quest_reader_thread, args=(data_manager, quest_reader), daemon=True).start() + threading.Thread( + target=quest_reader_thread, args=(data_manager, quest_reader), daemon=True + ).start() # Sync IK solver to current position - initial_joints = np.radians(data_manager.get_current_joint_angles() or NEUTRAL_JOINT_ANGLES) + initial_joints = np.radians( + data_manager.get_current_joint_angles() or NEUTRAL_JOINT_ANGLES + ) ik_solver = PinkIKSolver( - urdf_path=URDF_PATH, end_effector_frame=GRIPPER_FRAME_NAME, - initial_configuration=initial_joints, posture_cost_vector=np.array(POSTURE_COST_VECTOR) + urdf_path=URDF_PATH, + end_effector_frame=GRIPPER_FRAME_NAME, + initial_configuration=initial_joints, + posture_cost_vector=np.array(POSTURE_COST_VECTOR), ) - threading.Thread(target=ik_solver_thread, args=(data_manager, ik_solver), daemon=True).start() + threading.Thread( + target=ik_solver_thread, args=(data_manager, ik_solver), daemon=True + ).start() threading.Thread(target=camera_thread, args=(data_manager,), daemon=True).start() # Foot Pedal Initialization @@ -184,7 +203,8 @@ def toggle_recording() -> None: print("\nšŸ‘‹ Shutting down...") finally: pedal.stop() - if nc.is_recording(): nc.cancel_recording() + if nc.is_recording(): + nc.cancel_recording() nc.logout() data_manager.request_shutdown() quest_reader.stop() diff --git a/examples/common/data_manager.py b/examples/common/data_manager.py index 9e159a1..d2a380c 100644 --- a/examples/common/data_manager.py +++ b/examples/common/data_manager.py @@ -93,6 +93,17 @@ def __init__(self) -> None: self.rgb_image: np.ndarray | None = None +class LeaderMappedState: + """Leader arm mapped state - joint angles and gripper from leader read_mapped().""" + + def __init__(self) -> None: + """Initialize LeaderMappedState with default values.""" + self._lock = threading.Lock() + + self.joint_angles: np.ndarray | None = None + self.gripper_open: float | None = None + + class DataManager: """Main state container coordinating all state groups. @@ -112,6 +123,7 @@ def __init__(self) -> None: self._robot_state = RobotState() self._ik_state = IKState() self._camera_state = CameraState() + self._leader_mapped_state = LeaderMappedState() # System state self._shutdown_event = threading.Event() @@ -530,6 +542,42 @@ def get_ik_success(self) -> bool: with self._ik_state._lock: return self._ik_state.success + # ============================================================================ + # Leader Mapped State Methods + # ============================================================================ + + def set_leader_mapped_state( + self, joint_angles: np.ndarray, gripper_open: float + ) -> None: + """Set leader-mapped joint angles and gripper (thread-safe). + + Args: + joint_angles: Follower-space joint angles from leader read_mapped(). + gripper_open: Gripper open amount in [0, 1]. + """ + with self._leader_mapped_state._lock: + self._leader_mapped_state.joint_angles = ( + joint_angles.copy() if joint_angles is not None else None + ) + self._leader_mapped_state.gripper_open = gripper_open + + def get_leader_mapped_state( + self, + ) -> tuple[np.ndarray | None, float | None]: + """Get leader-mapped joint angles and gripper (thread-safe). + + Returns: + Tuple of (joint_angles, gripper_open); either may be None if not set. + """ + with self._leader_mapped_state._lock: + angles = ( + self._leader_mapped_state.joint_angles.copy() + if self._leader_mapped_state.joint_angles is not None + else None + ) + gripper = self._leader_mapped_state.gripper_open + return (angles, gripper) + # ============================================================================ # System State Methods # ============================================================================ diff --git a/scripts/foot_pedal/pedal_demo.py b/scripts/foot_pedal/pedal_demo.py index 6a1c5d2..3fc87c0 100755 --- a/scripts/foot_pedal/pedal_demo.py +++ b/scripts/foot_pedal/pedal_demo.py @@ -2,7 +2,6 @@ """Example of using the Foot Pedal to control the Piper robot.""" import sys -import threading import time from pathlib import Path @@ -11,10 +10,12 @@ import neuracore as nc from neuracore.core.input_devices.foot_pedal import FootPedal + from piper_controller import PiperController -def main(): +def main() -> None: + """Run a demonstration of foot pedal control for the Piper robot.""" print("=" * 60) print("FOOT PEDAL ROBOT CONTROL DEMO") print("=" * 60) @@ -32,7 +33,7 @@ def main(): # Initialize Foot Pedal print("\nāŒØļø Initializing Foot Pedal...") pedal = FootPedal() - + if not any(pedal.mappings.values()): print("āš ļø Foot pedal mappings are empty. Please run run_pedal_config.sh first.") # Default fallback for demo @@ -44,10 +45,13 @@ def main(): nc.login() print("āœ“ Connected to Neuracore") except Exception: - print("āš ļø Neuracore connection failed. Recording actions will only print messages.") + print( + "āš ļø Neuracore connection failed. Recording actions will only print messages." + ) # Define callbacks - def on_activate(): + def on_activate() -> None: + """Handle the activate/enable action.""" print("\nšŸš€ Foot Pedal: ACTIVATE pressed") if robot: if robot.resume_robot(): @@ -57,7 +61,8 @@ def on_activate(): else: print("Action: Robot ACTIVATE (MOCK)") - def on_home(): + def on_home() -> None: + """Handle the move-to-home action.""" print("\nšŸ  Foot Pedal: HOME pressed") if robot: robot.move_to_home() @@ -65,7 +70,8 @@ def on_home(): else: print("Action: Robot HOME (MOCK)") - def on_record(): + def on_record() -> None: + """Handle the toggle-recording action.""" print("\nšŸ“¹ Foot Pedal: RECORD pressed") try: if not nc.is_recording(): @@ -76,7 +82,7 @@ def on_record(): print("āœ“ Recording stopped") except Exception as e: print(f"āš ļø Recording action failed: {e}") - print(f"Action: Toggle Recording (MOCK)") + print("Action: Toggle Recording (MOCK)") # Register callbacks pedal.on("activate", on_activate)