Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the planning API to use a structured PlanState object (instead of raw dicts) and adapts the TOPPRA planner and motion generator to the new interface.
Changes:
- Introduces
PlanState(plusMoveType/MovePart) in planner utils. - Refactors
BasePlannerandToppraPlannerto acceptPlanStateinputs and initializes planners via**kwargs. - Updates
MotionGeneratorto constructPlanStateobjects before invoking the planner.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
embodichain/lab/sim/planners/utils.py |
Adds PlanState and related enums used to represent planning inputs more explicitly. |
embodichain/lab/sim/planners/base_planner.py |
Updates base planner API to accept PlanState and switches init to **kwargs. |
embodichain/lab/sim/planners/toppra_planner.py |
Updates TOPPRA planner to accept PlanState and adjusts waypoint handling accordingly. |
embodichain/lab/sim/planners/motion_generator.py |
Packs dict-based states into PlanState before calling the underlying planner. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| move_part: MovePart = MovePart.LEFT | ||
| xpos: torch.Tensor = None | ||
| qpos: torch.Tensor = None | ||
| qacc: torch.Tensor = None | ||
| qvel: torch.Tensor = None |
There was a problem hiding this comment.
PlanState fields are annotated as torch.Tensor but default to None (and are also populated with lists in MotionGenerator.plan). Consider widening these types (e.g., torch.Tensor | np.ndarray | Sequence[float] | None) and/or using Optional[...] to match the actual inputs and avoid type-checking/runtime surprises.
| qvel=current_state["velocity"], | ||
| qacc=current_state["acceleration"], |
There was a problem hiding this comment.
MotionGenerator.plan previously worked when callers provided only position (docstring says velocity/acceleration are optional), but this new packing code unconditionally indexes current_state["velocity"] and ["acceleration"], which will raise KeyError for minimal inputs. Please default missing values (e.g., zeros) or reuse _create_state_dict-style normalization before building PlanState.
| qvel=current_state["velocity"], | |
| qacc=current_state["acceleration"], | |
| qvel=current_state.get("velocity", np.zeros(self.dof)), | |
| qacc=current_state.get("acceleration", np.zeros(self.dof)), |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
embodichain/lab/sim/planners/toppra_planner.py:63
plan()now takesPlanState, but the docstring still refers to state dictionaries. Please update the parameter documentation to matchPlanState(e.g., which fields are required such asqpos).
r"""Execute trajectory planning.
Args:
current_state: Dictionary containing 'position', 'velocity', 'acceleration' for current state
target_states: List of dictionaries containing target states
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| import torch | ||
| import numpy as np | ||
| from typing import Dict, List, Tuple, Union, Any | ||
| from enum import Enum | ||
| from scipy.spatial.transform import Rotation, Slerp | ||
|
|
||
| from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner | ||
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod | ||
| from embodichain.lab.sim.objects.robot import Robot | ||
| from embodichain.utils import logger | ||
|
|
||
|
|
||
| class PlannerType(Enum): | ||
| r"""Enumeration for different planner types.""" | ||
|
|
||
| TOPPRA = "toppra" | ||
| """TOPPRA planner for time-optimal trajectory planning.""" | ||
| from embodichain.lab.sim.planners.utils import PlanState, MoveType, MovePart | ||
|
|
There was a problem hiding this comment.
After removing PlannerType, from enum import Enum appears unused in this module. Please drop the unused import to avoid lint failures and keep the import list accurate.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| @@ -500,10 +473,7 @@ def calculate_point_allocations( | |||
| logger.log_error("Failed to plan trajectory") | |||
|
|
|||
| # Convert positions to list | |||
| out_qpos_list = ( | |||
| positions.tolist() if isinstance(positions, np.ndarray) else positions | |||
| ) | |||
|
|
|||
| out_qpos_list = positions.to("cpu").numpy().tolist() | |||
There was a problem hiding this comment.
out_qpos_list = positions.to("cpu").numpy().tolist() assumes positions is always a torch.Tensor. With the current ToppraPlanner.plan early-return branch, positions can be a NumPy array, causing an AttributeError here. Either enforce the torch return type in all planners (preferred) or add a safe conversion/branch here.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
embodichain/lab/sim/planners/motion_generator.py:521
- This converts
positions(already atorch.Tensor) to NumPy and back to a new CPU tensor, which is unnecessary and can change dtype (often to float64). Prefer keeping it as a tensor (e.g.,positions.detach().cpu().to(torch.float32)) and, if you need a list, call.tolist()once at the boundary.
out_qpos_list = positions.to("cpu").numpy().tolist()
out_qpos_list = (
torch.tensor(out_qpos_list)
if not isinstance(out_qpos_list, torch.Tensor)
else out_qpos_list
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod | ||
| from embodichain.lab.sim.objects.robot import Robot | ||
| from embodichain.utils import logger | ||
| from embodichain.lab.sim.planners.utils import PlanState, MoveType, MovePart |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| @@ -24,13 +24,48 @@ | |||
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod | |||
| from embodichain.lab.sim.objects.robot import Robot | |||
| from embodichain.utils import logger | |||
| from dataclasses import dataclass | |||
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 11 comments.
Comments suppressed due to low confidence (1)
docs/source/tutorial/motion_gen.rst:71
create_discrete_trajectorysignature has changed (it now takes tensors and a runtimecfg, andsample_numis no longer a parameter). This example call withsample_num=20will not work; update it to construct a runtime cfg (e.g.,ToppraPlannerRuntimeCfg(sample_method=..., sample_interval=...)) and pass it viacfg=....
# Generate a discrete trajectory (joint or Cartesian)
qpos_list, xpos_list = motion_gen.create_discrete_trajectory(
qpos_list=[[0,0,0,0,0,0],[0.5,0.2,0,0,0,0]],
sample_num=20
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| - `times` is the array of time stamps. | ||
| - `duration` is the total trajectory time. | ||
| - `result.positions`, `result.velocities`, `result.accelerations` are arrays of sampled trajectory points. | ||
| - `result.dt` is the array of time stamps. |
| empty_qpos = torch.empty((0, self.dof), dtype=torch.float32) | ||
| empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32) | ||
| return empty_qpos, empty_xpos |
| # Create TOPPRA-specific constraint arrays (symmetric format) | ||
| # This format is required by TOPPRA library | ||
| self.vlims = np.array([[-v, v] for v in max_constraints["velocity"]]) | ||
| self.alims = np.array([[-a, a] for a in max_constraints["acceleration"]]) | ||
| if isinstance(cfg.constraints["velocity"], float): | ||
| self.vlims = np.array( | ||
| [ | ||
| [-cfg.constraints["velocity"], cfg.constraints["velocity"]] | ||
| for _ in range(self.dofs) | ||
| ] | ||
| ) | ||
| else: | ||
| self.vlims = np.array(cfg.constraints["velocity"]) | ||
|
|
||
| if isinstance(cfg.constraints["acceleration"], float): | ||
| self.alims = np.array( | ||
| [ | ||
| [-cfg.constraints["acceleration"], cfg.constraints["acceleration"]] | ||
| for _ in range(self.dofs) | ||
| ] | ||
| ) | ||
| else: | ||
| self.alims = np.array(cfg.constraints["acceleration"]) |
| "velocity": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0], | ||
| "acceleration": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0] |
| return empty_qpos, empty_xpos | ||
|
|
||
| # Plan trajectory using internal plan method | ||
| success, positions, velocities, accelerations, times, duration = self.plan( | ||
| current_state=current_state, | ||
| target_states=target_states, | ||
| sample_method=sample_method, | ||
| sample_interval=sample_num, | ||
| **kwargs, | ||
| plan_result = self.plan( | ||
| current_state=init_plan_state, target_states=target_plan_states, cfg=cfg | ||
| ) | ||
|
|
||
| if not success or positions is None: | ||
| if not plan_result.success or plan_result.positions is None: |
|
|
||
| result = self.planner.plan(current_state, target_states, sample_interval=0.1) | ||
| assert result.success is True |
| # Input validation | ||
| if len(xpos_list) < 2: | ||
| logger.log_warning("xpos_list must contain at least 2 points") | ||
| empty_qpos = torch.empty((0, self.dof), dtype=torch.float32) | ||
| empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32) | ||
| return empty_qpos, empty_xpos | ||
|
|
||
| # Calculate point allocations for interpolation | ||
| interpolated_point_allocations = calculate_point_allocations( | ||
| xpos_list, step_size=0.002, angle_step=np.pi / 90 | ||
| ) | ||
|
|
| move_type=MoveType.JOINT_MOVE, qpos=interpolate_qpos_list[0] | ||
| ) | ||
| target_states = [] | ||
| for qpos in interpolate_qpos_list: |
| qpos_seed: torch.Tensor | None = None | ||
| """Optional seed joint configuration for IK initialization during interpolation. Should be a 1D tensor of shape (DOF,). If None, the first point in qpos_list will be used as the seed.""" | ||
|
|
||
| is_linear: bool = True |
There was a problem hiding this comment.
Move is_linear and interpolate_step to MotionGenerator
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 17 comments.
Comments suppressed due to low confidence (2)
docs/source/tutorial/motion_gen.rst:72
- This section still documents
create_discrete_trajectory(...), but that method was removed fromMotionGeneratorin this PR. Either update the docs to describe the replacement workflow (buildingPlanState+runtime_cfgand callingplan()), or keepcreate_discrete_trajectoryas a compatibility wrapper.
# Generate a discrete trajectory (joint or Cartesian)
qpos_list, xpos_list = motion_gen.create_discrete_trajectory(
qpos_list=[[0,0,0,0,0,0],[0.5,0.2,0,0,0,0]],
sample_num=20
)
docs/source/overview/sim/planners/motion_generator.md:116
- This doc still demonstrates
motion_gen.plan(current_state=..., sample_method=..., sample_interval=...)andcreate_discrete_trajectory(...), butMotionGenerator.planno longer takescurrent_stateandcreate_discrete_trajectorywas removed. Update the documentation to reflect the new API (passruntime_cfgandPlanStatewaypoints), otherwise the examples won’t run.
### Trajectory Planning
#### Joint Space Planning
```python
from embodichain.lab.sim.planners.utils import PlanState
current_state = PlanState(qpos=[0, 0, 0, 0, 0, 0])
target_states = [
PlanState(qpos=[1, 1, 1, 1, 1, 1])
]
result = motion_gen.plan(
current_state=current_state,
target_states=target_states,
sample_method=TrajectorySampleMethod.TIME,
sample_interval=0.01
)
Cartesian or Joint Interpolation
# Using joint configurations (qpos_list)
qpos_list = [
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1]
]
out_qpos_list, out_xpos_list = motion_gen.create_discrete_trajectory(
qpos_list=qpos_list,
is_linear=False,
sample_method=TrajectorySampleMethod.QUANTITY,
sample_num=20
)💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if xpos_list is None: | ||
| logger.log_warning("Either xpos_list or qpos_list must be provided") | ||
| empty_qpos = torch.empty((0, self.dof), dtype=torch.float32) | ||
| empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32) | ||
| return empty_qpos, empty_xpos |
| current_state: PlanState | ||
| target_states: List of PlanState | ||
| cfg: Planner runtime configuration. |
| for i in range(self.dofs): | ||
| label = f"Joint {i+1}" if b == 0 else "" | ||
| axs[0].plot( |
| init_plan_state, target_plan_states = self.planner.interpolate_trajectory( | ||
| control_part=runtime_cfg.control_part, | ||
| xpos_list=xpos_list, | ||
| qpos_list=qpos_list, | ||
| cfg=runtime_cfg, | ||
| ) | ||
|
|
||
| # Check if current position is significantly different from first waypoint | ||
| pos_diff = np.linalg.norm(current_xpos[:3, 3] - xpos_list[0][:3, 3]) | ||
| rot_diff = np.linalg.norm(current_xpos[:3, :3] - xpos_list[0][:3, :3]) | ||
|
|
||
| if pos_diff > 0.001 or rot_diff > 0.01: | ||
| xpos_list = np.concatenate( | ||
| [current_xpos[None, :, :], xpos_list], axis=0 | ||
| ) | ||
| if qpos_list is not None: | ||
| qpos_list = np.concatenate( | ||
| [current_qpos[None, :], qpos_list], axis=0 | ||
| ) | ||
|
|
||
| if qpos_seed is None and qpos_list is not None: | ||
| qpos_seed = qpos_list[0] | ||
|
|
||
| # Input validation | ||
| if len(xpos_list) < 2: | ||
| logger.log_warning("xpos_list must contain at least 2 points") | ||
| return [], [] | ||
|
|
||
| # Calculate point allocations for interpolation | ||
| interpolated_point_allocations = calculate_point_allocations( | ||
| xpos_list, step_size=0.002, angle_step=np.pi / 90 | ||
| ) | ||
|
|
||
| # Generate trajectory | ||
| interpolate_qpos_list = [] | ||
| if is_linear or qpos_list is None: | ||
| # Linear cartesian interpolation | ||
| for i in range(len(xpos_list) - 1): | ||
| interpolated_poses = interpolate_xpos( | ||
| xpos_list[i], xpos_list[i + 1], interpolated_point_allocations[i] | ||
| ) | ||
| for xpos in interpolated_poses: | ||
| success, qpos = self.robot.compute_ik( | ||
| pose=xpos, joint_seed=qpos_seed, name=self.uid | ||
| ) | ||
|
|
||
| if isinstance(success, torch.Tensor): | ||
| is_success = bool(success.all()) | ||
| elif isinstance(success, np.ndarray): | ||
| is_success = bool(np.all(success)) | ||
| elif isinstance(success, (list, tuple)): | ||
| is_success = all(success) | ||
| else: | ||
| is_success = bool(success) | ||
|
|
||
| if isinstance(qpos, torch.Tensor): | ||
| has_nan = torch.isnan(qpos).any().item() | ||
| else: | ||
| has_nan = np.isnan(qpos).any() | ||
|
|
||
| if not is_success or qpos is None or has_nan: | ||
| logger.log_debug( | ||
| f"IK failed or returned nan at pose, skipping this point." | ||
| ) | ||
| continue | ||
|
|
||
| interpolate_qpos_list.append( | ||
| qpos[0] if isinstance(qpos, (np.ndarray, list)) else qpos | ||
| ) | ||
| qpos_seed = ( | ||
| qpos[0] if isinstance(qpos, (np.ndarray, list)) else qpos | ||
| ) | ||
| else: |
| from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg | ||
|
|
||
| # Assume you have a robot instance and arm_name | ||
| motion_cfg = MotionGenCfg( | ||
| planner_cfg=ToppraPlannerCfg( | ||
| robot_uid=robot.uid, | ||
| control_part=arm_name, | ||
| constraints={ | ||
| "velocity": 0.2, | ||
| "acceleration": 0.5, | ||
| }, | ||
| ) |
| assert self.planner.dofs == 6 | ||
| assert len(self.planner.vlims) == 6 | ||
| assert len(self.planner.alims) == 6 | ||
| assert self.planner.device == torch.device("cpu") | ||
|
|
| def test_plan_basic(self): | ||
| current_state = PlanState(qpos=np.zeros(6)) | ||
| target_states = [PlanState(qpos=np.ones(6))] | ||
|
|
||
| result = self.planner.plan(current_state, target_states, sample_interval=0.1) | ||
| assert result.success is True |
docs/source/tutorial/motion_gen.rst
Outdated
| # Plan a joint-space trajectory | ||
| current_state = {"position": [0, 0, 0, 0, 0, 0]} | ||
| target_states = [{"position": [0.5, 0.2, 0, 0, 0, 0]}] | ||
| success, positions, velocities, accelerations, times, duration = motion_gen.plan( | ||
| current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.])) | ||
| target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))] | ||
| plan_result = motion_gen.plan( | ||
| current_state=current_state, | ||
| target_states=target_states | ||
| ) | ||
| success = plan_result.success | ||
| positions = plan_result.positions | ||
| velocities = plan_result.velocities | ||
| accelerations = plan_result.accelerations | ||
| times = plan_result.t_series | ||
| duration = plan_result.duration | ||
|
|
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod, PlanState | ||
| from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner | ||
| success, positions, velocities, accelerations, times, duration = planner.plan( | ||
| current_state={ | ||
| "position": [0, 0, 0, 0, 0, 0], | ||
| "velocity": [0, 0, 0, 0, 0, 0], | ||
| "acceleration": [0, 0, 0, 0, 0, 0] | ||
| }, | ||
| result = planner.plan( | ||
| current_state=PlanState(qpos=[0, 0, 0, 0, 0, 0]), | ||
| target_states=[ | ||
| {"position": [1, 1, 1, 1, 1, 1]} | ||
| PlanState(qpos=[1, 1, 1, 1, 1, 1]) | ||
| ], | ||
| sample_method=TrajectorySampleMethod.TIME, | ||
| sample_interval=0.01 | ||
| ) | ||
| ``` | ||
|
|
||
| - `positions`, `velocities`, `accelerations` are arrays of sampled trajectory points. | ||
| - `times` is the array of time stamps. | ||
| - `duration` is the total trajectory time. | ||
| - `result.positions`, `result.velocities`, `result.accelerations` are arrays of sampled trajectory points. | ||
| - `result.dt` is the array of time stamps. | ||
| - `result.duration` is the total trajectory time. |
| if self.dofs > 0 and has_constraints: | ||
| plot_idx = 1 | ||
| if vels is not None: | ||
| max_vel = self.cfg.constraints["velocity"][0] | ||
| axs[plot_idx].plot( | ||
| time_steps, | ||
| [-max_vel] * len(time_steps), | ||
| "k--", | ||
| label="Max Velocity", | ||
| ) | ||
| axs[plot_idx].plot(time_steps, [max_vel] * len(time_steps), "k--") | ||
| plot_idx += 1 | ||
|
|
||
| if accs is not None: | ||
| max_acc = self.cfg.constraints["acceleration"][0] | ||
| axs[plot_idx].plot( |
…EmbodiChain into cj/update-base-planner
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 16 comments.
Comments suppressed due to low confidence (2)
docs/source/overview/sim/planners/motion_generator.md:116
- The
MotionGeneratordoc still importsToppraPlannerCfgfromembodichain.lab.sim.planners.motion_generator, but that class is defined intoppra_planner(or re-exported fromembodichain.lab.sim.planners). Also, the examples below still use the removedcurrent_stateargument andcreate_discrete_trajectoryAPI. Please update imports and usage to the newplan(target_states, runtime_cfg=...)flow.
from embodichain.lab.sim.planners.motion_generator import MotionGenerator, MotionGenCfg, ToppraPlannerCfg
from embodichain.lab.sim.objects.robot import Robot
from embodichain.lab.sim.solvers.pink_solver import PinkSolverCfg
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
# Configure the simulation
sim_cfg = SimulationManagerCfg(
width=1920,
height=1080,
physics_dt=1.0 / 100.0,
sim_device="cpu",
)
sim = SimulationManager(sim_cfg)
# Get UR10 URDF path
urdf_path = get_data_path("UniversalRobots/UR10/UR10.urdf")
# Create UR10 robot
robot_cfg = RobotCfg(
uid="UR10_test",
urdf_cfg=URDFCfg(
components=[{"component_type": "arm", "urdf_path": urdf_path}]
),
control_parts={"arm": ["Joint[1-6]"]},
solver_cfg={
"arm": PinkSolverCfg(
urdf_path=urdf_path,
end_link_name="ee_link",
root_link_name="base_link",
pos_eps=1e-2,
rot_eps=5e-2,
max_iterations=300,
dt=0.1,
)
},
drive_pros=JointDrivePropertiesCfg(
stiffness={"Joint[1-6]": 1e4},
damping={"Joint[1-6]": 1e3},
),
)
robot = sim.add_robot(cfg=robot_cfg)
motion_gen = MotionGenerator(
cfg=MotionGenCfg(
planner_cfg=ToppraPlannerCfg(
robot_uid="UR10_test",
control_part="arm",
constraints={
"velocity": 0.2,
"acceleration": 0.5,
},
)
)
)
Trajectory Planning
Joint Space Planning
from embodichain.lab.sim.planners.utils import PlanState
current_state = PlanState(qpos=[0, 0, 0, 0, 0, 0])
target_states = [
PlanState(qpos=[1, 1, 1, 1, 1, 1])
]
result = motion_gen.plan(
current_state=current_state,
target_states=target_states,
sample_method=TrajectorySampleMethod.TIME,
sample_interval=0.01
)Cartesian or Joint Interpolation
# Using joint configurations (qpos_list)
qpos_list = [
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1]
]
out_qpos_list, out_xpos_list = motion_gen.create_discrete_trajectory(
qpos_list=qpos_list,
is_linear=False,
sample_method=TrajectorySampleMethod.QUANTITY,
sample_num=20
)docs/source/tutorial/motion_gen.rst:72
- This section still documents the old
MotionGenerator.plan(current_state=..., sample_method=..., sample_interval=...)andcreate_discrete_trajectory(...)APIs, and referencesplan_result.t_series(which doesn’t exist;PlanResultcurrently exposesdt). Since these APIs were removed/changed in this PR, the tutorial should be updated to the newplan(target_states, runtime_cfg=...)signature (and either removecreate_discrete_trajectoryusage or replace it withis_pre_interpolate=True).
# Plan a joint-space trajectory
current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.]))
target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))]
plan_result = motion_gen.plan(
current_state=current_state,
target_states=target_states
)
success = plan_result.success
positions = plan_result.positions
velocities = plan_result.velocities
accelerations = plan_result.accelerations
times = plan_result.t_series
duration = plan_result.duration
# Generate a discrete trajectory (joint or Cartesian)
qpos_list, xpos_list = motion_gen.create_discrete_trajectory(
qpos_list=[[0,0,0,0,0,0],[0.5,0.2,0,0,0,0]],
sample_num=20
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
|
|
||
| if xpos_list is None: | ||
| logger.log_warning("Either xpos_list or qpos_list must be provided") | ||
| empty_qpos = torch.empty((0, self.dof), dtype=torch.float32) |
| def interpolate_trajectory( | ||
| self, | ||
| control_part: str | None = None, | ||
| xpos_list: torch.Tensor | None = None, | ||
| qpos_list: torch.Tensor | None = None, | ||
| cfg: BasePlannerRuntimeCfg = BasePlannerRuntimeCfg(), | ||
| ) -> tuple[PlanState, list[PlanState]]: | ||
| if qpos_list is not None: | ||
| if not isinstance(qpos_list, torch.Tensor): | ||
| qpos_list = np.asarray(qpos_list) | ||
| qpos_tensor = torch.as_tensor( | ||
| qpos_list, dtype=torch.float32, device=self.robot.device | ||
| ) | ||
| if qpos_tensor.dim() == 1: | ||
| qpos_tensor = qpos_tensor.unsqueeze(0) | ||
|
|
||
| qpos_batch = qpos_tensor.unsqueeze(0) # [n_env=1, n_batch=N, dof] | ||
| xpos_batch = self.robot.compute_batch_fk( | ||
| qpos=qpos_batch, | ||
| name=control_part, | ||
| to_matrix=True, | ||
| ) | ||
| xpos_list = xpos_batch.squeeze(0) | ||
| qpos_list = qpos_tensor | ||
|
|
||
| if xpos_list is None: | ||
| logger.log_warning("Either xpos_list or qpos_list must be provided") | ||
| empty_qpos = torch.empty((0, self.dof), dtype=torch.float32) | ||
| empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32) | ||
| return empty_qpos, empty_xpos | ||
|
|
||
| if not isinstance(xpos_list, torch.Tensor): | ||
| xpos_list = torch.as_tensor( | ||
| np.asarray(xpos_list), | ||
| dtype=torch.float32, | ||
| device=self.robot.device, | ||
| ) | ||
| else: | ||
| xpos_list = xpos_list.to(dtype=torch.float32, device=self.robot.device) | ||
|
|
||
| if cfg.start_qpos is not None: | ||
| start_xpos = self.robot.compute_fk( | ||
| qpos=cfg.start_qpos.unsqueeze(0), name=control_part, to_matrix=True | ||
| ) | ||
| qpos_list = ( | ||
| torch.cat([cfg.start_qpos.unsqueeze(0), qpos_list], dim=0) | ||
| if qpos_list is not None | ||
| else None | ||
| ) | ||
| xpos_list = torch.cat([start_xpos, xpos_list], dim=0) | ||
| # Input validation | ||
| if len(xpos_list) < 2: | ||
| logger.log_warning("xpos_list must contain at least 2 points") | ||
| return None, None | ||
|
|
docs/source/tutorial/motion_gen.rst
Outdated
| from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg | ||
|
|
||
| # Assume you have a robot instance and arm_name | ||
| motion_cfg = MotionGenCfg( | ||
| planner_cfg=ToppraPlannerCfg( | ||
| robot_uid=robot.uid, | ||
| control_part=arm_name, | ||
| constraints={ | ||
| "velocity": 0.2, | ||
| "acceleration": 0.5, | ||
| }, | ||
| ) | ||
| ) | ||
| motion_gen = MotionGenerator(cfg=motion_cfg) | ||
|
|
||
| # Plan a joint-space trajectory | ||
| current_state = {"position": [0, 0, 0, 0, 0, 0]} | ||
| target_states = [{"position": [0.5, 0.2, 0, 0, 0, 0]}] | ||
| success, positions, velocities, accelerations, times, duration = motion_gen.plan( | ||
| current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.])) | ||
| target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))] | ||
| plan_result = motion_gen.plan( |
| xpos_list, step_size=0.002, angle_step=np.pi / 90, device=self.device | ||
| ) | ||
| # Input validation | ||
| if len(xpos_list) < 2: | ||
| logger.log_warning("xpos_list must contain at least 2 points") | ||
| empty_qpos = torch.empty((0, self.dof), dtype=torch.float32) | ||
| empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32) | ||
| return empty_qpos, empty_xpos | ||
|
|
||
| # Calculate point allocations for interpolation | ||
| interpolated_point_allocations = calculate_point_allocations( | ||
| xpos_list, step_size=0.002, angle_step=np.pi / 90 | ||
| ) | ||
|
|
| if self.dofs > 0 and has_constraints: | ||
| plot_idx = 1 | ||
| if vels is not None: | ||
| max_vel = self.cfg.constraints["velocity"][0] | ||
| axs[plot_idx].plot( | ||
| time_steps, | ||
| [-max_vel] * len(time_steps), | ||
| "k--", | ||
| label="Max Velocity", | ||
| ) | ||
| axs[plot_idx].plot(time_steps, [max_vel] * len(time_steps), "k--") | ||
| plot_idx += 1 | ||
|
|
||
| if accs is not None: | ||
| max_acc = self.cfg.constraints["acceleration"][0] | ||
| axs[plot_idx].plot( |
| # Check waypoints | ||
| if len(current_state["position"]) != self.dofs: | ||
| logger.log_info("Current wayponit does not align") | ||
| return False, None, None, None, None, None | ||
| start_qpos = ( | ||
| runtime_cfg.start_qpos | ||
| if runtime_cfg.start_qpos is not None | ||
| else target_states[0].qpos | ||
| ) | ||
| if len(start_qpos) != dofs: | ||
| logger.log_error("Current waypoint does not align") | ||
| for target in target_states: | ||
| if len(target["position"]) != self.dofs: | ||
| logger.log_info("Target Wayponits does not align") | ||
| return False, None, None, None, None, None | ||
| if len(target.qpos) != dofs: | ||
| logger.log_error("Target waypoints do not align") | ||
|
|
||
| if ( | ||
| len(target_states) == 1 | ||
| and np.sum( | ||
| np.abs( | ||
| np.array(target_states[0]["position"]) | ||
| - np.array(current_state["position"]) | ||
| ) | ||
| ) | ||
| and np.sum(np.abs(np.array(target_states[0].qpos) - np.array(start_qpos))) | ||
| < 1e-3 | ||
| ): | ||
| logger.log_info("Only two same waypoints, do not plan") | ||
| return ( | ||
| True, | ||
| np.array([current_state["position"], target_states[0]["position"]]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| np.array([[0.0] * self.dofs, [0.0] * self.dofs]), | ||
| 0, | ||
| 0, | ||
| logger.log_warning("Only two same waypoints, returning trivial trajectory.") | ||
| return PlanResult( | ||
| success=True, | ||
| positions=torch.as_tensor( | ||
| np.array([start_qpos, target_states[0].qpos]), | ||
| dtype=torch.float32, | ||
| device=self.device, | ||
| ), | ||
| velocities=torch.as_tensor( | ||
| np.array([[0.0] * dofs, [0.0] * dofs]), | ||
| dtype=torch.float32, | ||
| device=self.device, | ||
| ), | ||
| accelerations=torch.as_tensor( | ||
| np.array([[0.0] * dofs, [0.0] * dofs]), | ||
| dtype=torch.float32, | ||
| device=self.device, | ||
| ), | ||
| dt=torch.as_tensor([0.0, 0.0], dtype=torch.float32, device=self.device), | ||
| duration=0.0, | ||
| ) | ||
|
|
||
| # Build waypoints | ||
| waypoints = [np.array(current_state["position"])] | ||
| waypoints = [np.array(start_qpos)] | ||
| for target in target_states: | ||
| waypoints.append(np.array(target["position"])) | ||
| waypoints.append(np.array(target.qpos)) | ||
| waypoints = np.array(waypoints) |
| @@ -51,555 +61,225 @@ class MotionGenerator: | |||
| **kwargs: Additional arguments passed to planner initialization | |||
| """ | |||
| estimated_num = self.motion_gen.estimate_trajectory_sample_count( | ||
| qpos_list=self.qpos_list, | ||
| qpos_list=torch.as_tensor(np.array(self.qpos_list)), | ||
| step_size=0.01, | ||
| angle_step=np.pi / 90, | ||
| control_part=self.arm_name, | ||
| ) |
| ```python | ||
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod | ||
| from embodichain.lab.sim.planners.utils import TrajectorySampleMethod, PlanState | ||
| from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner | ||
| success, positions, velocities, accelerations, times, duration = planner.plan( | ||
| current_state={ | ||
| "position": [0, 0, 0, 0, 0, 0], | ||
| "velocity": [0, 0, 0, 0, 0, 0], | ||
| "acceleration": [0, 0, 0, 0, 0, 0] | ||
| }, | ||
| result = planner.plan( | ||
| current_state=PlanState(qpos=[0, 0, 0, 0, 0, 0]), | ||
| target_states=[ | ||
| {"position": [1, 1, 1, 1, 1, 1]} | ||
| PlanState(qpos=[1, 1, 1, 1, 1, 1]) | ||
| ], | ||
| sample_method=TrajectorySampleMethod.TIME, | ||
| sample_interval=0.01 | ||
| ) | ||
| ``` | ||
|
|
||
| - `positions`, `velocities`, `accelerations` are arrays of sampled trajectory points. | ||
| - `times` is the array of time stamps. | ||
| - `duration` is the total trajectory time. | ||
| - `result.positions`, `result.velocities`, `result.accelerations` are arrays of sampled trajectory points. | ||
| - `result.dt` is the array of time stamps. | ||
| - `result.duration` is the total trajectory time. |
| # Check bounds | ||
| vel_check = torch.all(torch.abs(vels) <= max_vel).item() | ||
| acc_check = torch.all(torch.abs(accs) <= max_acc).item() | ||
|
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 18 comments.
Comments suppressed due to low confidence (1)
docs/source/overview/sim/planners/motion_generator.md:116
- The MotionGenerator overview still references removed/changed API (
control_part/constraintsinsideToppraPlannerCfg,motion_gen.plan(...), andcreate_discrete_trajectory(...)). Update the doc to use the newMotionGenerator.generate(...)flow withMotionGenOptions+ToppraPlanOptionsand the updatedPlanStatefields.
from embodichain.lab.sim.planners.motion_generator import MotionGenerator, MotionGenCfg, ToppraPlannerCfg
from embodichain.lab.sim.objects.robot import Robot
from embodichain.lab.sim.solvers.pink_solver import PinkSolverCfg
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
# Configure the simulation
sim_cfg = SimulationManagerCfg(
width=1920,
height=1080,
physics_dt=1.0 / 100.0,
sim_device="cpu",
)
sim = SimulationManager(sim_cfg)
# Get UR10 URDF path
urdf_path = get_data_path("UniversalRobots/UR10/UR10.urdf")
# Create UR10 robot
robot_cfg = RobotCfg(
uid="UR10_test",
urdf_cfg=URDFCfg(
components=[{"component_type": "arm", "urdf_path": urdf_path}]
),
control_parts={"arm": ["Joint[1-6]"]},
solver_cfg={
"arm": PinkSolverCfg(
urdf_path=urdf_path,
end_link_name="ee_link",
root_link_name="base_link",
pos_eps=1e-2,
rot_eps=5e-2,
max_iterations=300,
dt=0.1,
)
},
drive_pros=JointDrivePropertiesCfg(
stiffness={"Joint[1-6]": 1e4},
damping={"Joint[1-6]": 1e3},
),
)
robot = sim.add_robot(cfg=robot_cfg)
motion_gen = MotionGenerator(
cfg=MotionGenCfg(
planner_cfg=ToppraPlannerCfg(
robot_uid="UR10_test",
control_part="arm",
constraints={
"velocity": 0.2,
"acceleration": 0.5,
},
)
)
)
Trajectory Planning
Joint Space Planning
from embodichain.lab.sim.planners.utils import PlanState
current_state = PlanState(qpos=[0, 0, 0, 0, 0, 0])
target_states = [
PlanState(qpos=[1, 1, 1, 1, 1, 1])
]
result = motion_gen.plan(
current_state=current_state,
target_states=target_states,
sample_method=TrajectorySampleMethod.TIME,
sample_interval=0.01
)Cartesian or Joint Interpolation
# Using joint configurations (qpos_list)
qpos_list = [
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1]
]
out_qpos_list, out_xpos_list = motion_gen.create_discrete_trajectory(
qpos_list=qpos_list,
is_linear=False,
sample_method=TrajectorySampleMethod.QUANTITY,
sample_num=20
)💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| 0, | ||
| ) | ||
| logger.log_warning("Only two same waypoints, returning trivial trajectory.") | ||
| return target_states |
| if xpos_or_qpos == "xpos": | ||
| estimated_num = self.motion_gen.estimate_trajectory_sample_count( | ||
| xpos_list=self.xpos_list, | ||
| xpos_list=torch.as_tensor(np.array(self.xpos_list)), | ||
| step_size=0.01, | ||
| angle_step=np.pi / 90, | ||
| control_part=self.arm_name, | ||
| ) | ||
| else: | ||
| estimated_num = self.motion_gen.estimate_trajectory_sample_count( | ||
| qpos_list=self.qpos_list, | ||
| qpos_list=torch.as_tensor(np.array(self.qpos_list)), | ||
| step_size=0.01, | ||
| angle_step=np.pi / 90, | ||
| control_part=self.arm_name, | ||
| ) |
| def generate( | ||
| self, | ||
| target_states: List[PlanState], | ||
| options: MotionGenOptions = MotionGenOptions(), | ||
| ) -> PlanResult: |
| @validate_plan_options(options_cls=ToppraPlanOptions) | ||
| def plan( | ||
| self, | ||
| current_state: dict, | ||
| target_states: list[dict], | ||
| **kwargs, | ||
| ): | ||
| target_states: list[PlanState], | ||
| options: ToppraPlanOptions = ToppraPlanOptions(), | ||
| ) -> PlanResult: |
| # set constraints | ||
| if isinstance(options.constraints["velocity"], float): | ||
| vlims = np.array( | ||
| [ | ||
| [ | ||
| -options.constraints["velocity"], | ||
| options.constraints["velocity"], | ||
| ] | ||
| for _ in range(dofs) | ||
| ] | ||
| ) | ||
| else: | ||
| vlims = np.array(options.constraints["velocity"]) | ||
|
|
||
| if isinstance(options.constraints["acceleration"], float): | ||
| alims = np.array( |
| from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg | ||
|
|
||
| # Assume you have a robot instance and arm_name | ||
| motion_cfg = MotionGenCfg( | ||
| planner_cfg=ToppraPlannerCfg( | ||
| robot_uid=robot.uid, | ||
| control_part=arm_name, | ||
| constraints={ | ||
| "velocity": 0.2, | ||
| "acceleration": 0.5, | ||
| }, | ||
| ) | ||
| ) | ||
| motion_gen = MotionGenerator(cfg=motion_cfg) | ||
|
|
||
| # Plan a joint-space trajectory | ||
| current_state = {"position": [0, 0, 0, 0, 0, 0]} | ||
| target_states = [{"position": [0.5, 0.2, 0, 0, 0, 0]}] | ||
| success, positions, velocities, accelerations, times, duration = motion_gen.plan( | ||
| current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.])) | ||
| target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))] | ||
| plan_result = motion_gen.plan( | ||
| current_state=current_state, | ||
| target_states=target_states | ||
| ) | ||
| success = plan_result.success | ||
| positions = plan_result.positions | ||
| velocities = plan_result.velocities | ||
| accelerations = plan_result.accelerations | ||
| times = plan_result.t_series | ||
| duration = plan_result.duration | ||
|
|
| select_qpos_traj.extend(ret.positions.numpy()) | ||
| ee_state_list_select.extend([select_arm_current_gripper_state] * len(ret.positions)) | ||
|
|
||
|
|
| return ret.positions.numpy().T | ||
|
|
| @pytest.mark.skip(reason="Skipping CUDA tests temporarily") | ||
| class TestCPU(EmbodiedEnvTest): | ||
| def setup_method(self): | ||
| self.setup_simulation("cpu", enable_rt=False) | ||
|
|
||
|
|
||
| @pytest.mark.skip(reason="Skipping CUDA tests temporarily") |
| Note: | ||
| - Allows 10% tolerance for velocity constraints | ||
| - Allows 25% tolerance for acceleration constraints | ||
| - Prints exceed information if constraints are violated | ||
| - Assumes symmetric constraints (velocities and accelerations can be positive or negative) | ||
| - Supports batch dimension computation, e.g. (B, N, DOF) or (N, DOF) | ||
| """ | ||
| # Convert max_constraints to symmetric format for constraint checking | ||
| # This assumes symmetric constraints (common for most planners) | ||
| vlims = np.array([[-v, v] for v in self.max_constraints["velocity"]]) | ||
| alims = np.array([[-a, a] for a in self.max_constraints["acceleration"]]) | ||
|
|
||
| vel_check = np.all((velocities >= vlims[:, 0]) & (velocities <= vlims[:, 1])) | ||
| acc_check = np.all( | ||
| (accelerations >= alims[:, 0]) & (accelerations <= alims[:, 1]) | ||
| device = vels.device | ||
|
|
||
| max_vel = torch.tensor(constraints["velocity"], dtype=vels.dtype, device=device) | ||
| max_acc = torch.tensor( | ||
| constraints["acceleration"], dtype=accs.dtype, device=device | ||
| ) | ||
|
|
||
| # 超限情况 | ||
| # To support batching, we compute along all dimensions except the last one (DOF) | ||
| reduce_dims = tuple(range(vels.ndim - 1)) | ||
|
|
||
| # Check bounds | ||
| vel_check = torch.all(torch.abs(vels) <= max_vel).item() | ||
| acc_check = torch.all(torch.abs(accs) <= max_acc).item() |
Description
Update BasePlanner.
TODO:
create_discrete_trajectorywithplan(we could make xpos and is_linear as constraints)Type of change
Checklist
black .command to format the code base.