Skip to content

Update base planner#173

Merged
yuecideng merged 35 commits intomainfrom
cj/update-base-planner
Mar 19, 2026
Merged

Update base planner#173
yuecideng merged 35 commits intomainfrom
cj/update-base-planner

Conversation

@matafela
Copy link
Collaborator

@matafela matafela commented Mar 11, 2026

Description

Update BasePlanner.

TODO:

  • make modification to existed examples and environments
  • refacotr create_discrete_trajectory with plan (we could make xpos and is_linear as constraints)

Type of change

  • Enhancement (non-breaking change which improves an existing functionality)

Checklist

  • I have run the black . command to format the code base.
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • Dependencies have been updated, if applicable.

Copilot AI review requested due to automatic review settings March 11, 2026 08:48
@matafela matafela requested a review from yuecideng March 11, 2026 08:48
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the planning API to use a structured PlanState object (instead of raw dicts) and adapts the TOPPRA planner and motion generator to the new interface.

Changes:

  • Introduces PlanState (plus MoveType / MovePart) in planner utils.
  • Refactors BasePlanner and ToppraPlanner to accept PlanState inputs and initializes planners via **kwargs.
  • Updates MotionGenerator to construct PlanState objects before invoking the planner.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.

File Description
embodichain/lab/sim/planners/utils.py Adds PlanState and related enums used to represent planning inputs more explicitly.
embodichain/lab/sim/planners/base_planner.py Updates base planner API to accept PlanState and switches init to **kwargs.
embodichain/lab/sim/planners/toppra_planner.py Updates TOPPRA planner to accept PlanState and adjusts waypoint handling accordingly.
embodichain/lab/sim/planners/motion_generator.py Packs dict-based states into PlanState before calling the underlying planner.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +80 to +84
move_part: MovePart = MovePart.LEFT
xpos: torch.Tensor = None
qpos: torch.Tensor = None
qacc: torch.Tensor = None
qvel: torch.Tensor = None
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PlanState fields are annotated as torch.Tensor but default to None (and are also populated with lists in MotionGenerator.plan). Consider widening these types (e.g., torch.Tensor | np.ndarray | Sequence[float] | None) and/or using Optional[...] to match the actual inputs and avoid type-checking/runtime surprises.

Copilot uses AI. Check for mistakes.
Comment on lines +262 to +263
qvel=current_state["velocity"],
qacc=current_state["acceleration"],
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MotionGenerator.plan previously worked when callers provided only position (docstring says velocity/acceleration are optional), but this new packing code unconditionally indexes current_state["velocity"] and ["acceleration"], which will raise KeyError for minimal inputs. Please default missing values (e.g., zeros) or reuse _create_state_dict-style normalization before building PlanState.

Suggested change
qvel=current_state["velocity"],
qacc=current_state["acceleration"],
qvel=current_state.get("velocity", np.zeros(self.dof)),
qacc=current_state.get("acceleration", np.zeros(self.dof)),

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings March 12, 2026 04:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

Comments suppressed due to low confidence (1)

embodichain/lab/sim/planners/toppra_planner.py:63

  • plan() now takes PlanState, but the docstring still refers to state dictionaries. Please update the parameter documentation to match PlanState (e.g., which fields are required such as qpos).
        r"""Execute trajectory planning.

        Args:
            current_state: Dictionary containing 'position', 'velocity', 'acceleration' for current state
            target_states: List of dictionaries containing target states


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines 17 to 28
import torch
import numpy as np
from typing import Dict, List, Tuple, Union, Any
from enum import Enum
from scipy.spatial.transform import Rotation, Slerp

from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.lab.sim.objects.robot import Robot
from embodichain.utils import logger


class PlannerType(Enum):
r"""Enumeration for different planner types."""

TOPPRA = "toppra"
"""TOPPRA planner for time-optimal trajectory planning."""
from embodichain.lab.sim.planners.utils import PlanState, MoveType, MovePart

Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After removing PlannerType, from enum import Enum appears unused in this module. Please drop the unused import to avoid lint failures and keep the import list accurate.

Copilot uses AI. Check for mistakes.
chenjian added 2 commits March 12, 2026 14:38
Copilot AI review requested due to automatic review settings March 12, 2026 07:13
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +472 to +476
@@ -500,10 +473,7 @@ def calculate_point_allocations(
logger.log_error("Failed to plan trajectory")

# Convert positions to list
out_qpos_list = (
positions.tolist() if isinstance(positions, np.ndarray) else positions
)

out_qpos_list = positions.to("cpu").numpy().tolist()
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out_qpos_list = positions.to("cpu").numpy().tolist() assumes positions is always a torch.Tensor. With the current ToppraPlanner.plan early-return branch, positions can be a NumPy array, causing an AttributeError here. Either enforce the torch return type in all planners (preferred) or add a safe conversion/branch here.

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings March 13, 2026 08:05
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (1)

embodichain/lab/sim/planners/motion_generator.py:521

  • This converts positions (already a torch.Tensor) to NumPy and back to a new CPU tensor, which is unnecessary and can change dtype (often to float64). Prefer keeping it as a tensor (e.g., positions.detach().cpu().to(torch.float32)) and, if you need a list, call .tolist() once at the boundary.
        out_qpos_list = positions.to("cpu").numpy().tolist()
        out_qpos_list = (
            torch.tensor(out_qpos_list)
            if not isinstance(out_qpos_list, torch.Tensor)
            else out_qpos_list
        )

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.lab.sim.objects.robot import Robot
from embodichain.utils import logger
from embodichain.lab.sim.planners.utils import PlanState, MoveType, MovePart
chenjian added 2 commits March 13, 2026 17:15
Copilot AI review requested due to automatic review settings March 13, 2026 10:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +23 to +27
@@ -24,13 +24,48 @@
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.lab.sim.objects.robot import Robot
from embodichain.utils import logger
from dataclasses import dataclass
Copilot AI review requested due to automatic review settings March 17, 2026 08:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 11 comments.

Comments suppressed due to low confidence (1)

docs/source/tutorial/motion_gen.rst:71

  • create_discrete_trajectory signature has changed (it now takes tensors and a runtime cfg, and sample_num is no longer a parameter). This example call with sample_num=20 will not work; update it to construct a runtime cfg (e.g., ToppraPlannerRuntimeCfg(sample_method=..., sample_interval=...)) and pass it via cfg=....
   # Generate a discrete trajectory (joint or Cartesian)
   qpos_list, xpos_list = motion_gen.create_discrete_trajectory(
       qpos_list=[[0,0,0,0,0,0],[0.5,0.2,0,0,0,0]],
       sample_num=20
   )

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

- `times` is the array of time stamps.
- `duration` is the total trajectory time.
- `result.positions`, `result.velocities`, `result.accelerations` are arrays of sampled trajectory points.
- `result.dt` is the array of time stamps.
Comment on lines +324 to +326
empty_qpos = torch.empty((0, self.dof), dtype=torch.float32)
empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32)
return empty_qpos, empty_xpos
Comment on lines +74 to +94
# Create TOPPRA-specific constraint arrays (symmetric format)
# This format is required by TOPPRA library
self.vlims = np.array([[-v, v] for v in max_constraints["velocity"]])
self.alims = np.array([[-a, a] for a in max_constraints["acceleration"]])
if isinstance(cfg.constraints["velocity"], float):
self.vlims = np.array(
[
[-cfg.constraints["velocity"], cfg.constraints["velocity"]]
for _ in range(self.dofs)
]
)
else:
self.vlims = np.array(cfg.constraints["velocity"])

if isinstance(cfg.constraints["acceleration"], float):
self.alims = np.array(
[
[-cfg.constraints["acceleration"], cfg.constraints["acceleration"]]
for _ in range(self.dofs)
]
)
else:
self.alims = np.array(cfg.constraints["acceleration"])
Comment on lines 21 to 22
"velocity": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
"acceleration": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
Comment on lines +188 to +195
return empty_qpos, empty_xpos

# Plan trajectory using internal plan method
success, positions, velocities, accelerations, times, duration = self.plan(
current_state=current_state,
target_states=target_states,
sample_method=sample_method,
sample_interval=sample_num,
**kwargs,
plan_result = self.plan(
current_state=init_plan_state, target_states=target_plan_states, cfg=cfg
)

if not success or positions is None:
if not plan_result.success or plan_result.positions is None:
Comment on lines +52 to +54

result = self.planner.plan(current_state, target_states, sample_interval=0.1)
assert result.success is True
Comment on lines +385 to +396
# Input validation
if len(xpos_list) < 2:
logger.log_warning("xpos_list must contain at least 2 points")
empty_qpos = torch.empty((0, self.dof), dtype=torch.float32)
empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32)
return empty_qpos, empty_xpos

# Calculate point allocations for interpolation
interpolated_point_allocations = calculate_point_allocations(
xpos_list, step_size=0.002, angle_step=np.pi / 90
)

move_type=MoveType.JOINT_MOVE, qpos=interpolate_qpos_list[0]
)
target_states = []
for qpos in interpolate_qpos_list:
qpos_seed: torch.Tensor | None = None
"""Optional seed joint configuration for IK initialization during interpolation. Should be a 1D tensor of shape (DOF,). If None, the first point in qpos_list will be used as the seed."""

is_linear: bool = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move is_linear and interpolate_step to MotionGenerator

Copilot AI review requested due to automatic review settings March 17, 2026 11:31
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 17 comments.

Comments suppressed due to low confidence (2)

docs/source/tutorial/motion_gen.rst:72

  • This section still documents create_discrete_trajectory(...), but that method was removed from MotionGenerator in this PR. Either update the docs to describe the replacement workflow (building PlanState + runtime_cfg and calling plan()), or keep create_discrete_trajectory as a compatibility wrapper.
   # Generate a discrete trajectory (joint or Cartesian)
   qpos_list, xpos_list = motion_gen.create_discrete_trajectory(
       qpos_list=[[0,0,0,0,0,0],[0.5,0.2,0,0,0,0]],
       sample_num=20
   )

docs/source/overview/sim/planners/motion_generator.md:116

  • This doc still demonstrates motion_gen.plan(current_state=..., sample_method=..., sample_interval=...) and create_discrete_trajectory(...), but MotionGenerator.plan no longer takes current_state and create_discrete_trajectory was removed. Update the documentation to reflect the new API (pass runtime_cfg and PlanState waypoints), otherwise the examples won’t run.
### Trajectory Planning

#### Joint Space Planning

```python
from embodichain.lab.sim.planners.utils import PlanState

current_state = PlanState(qpos=[0, 0, 0, 0, 0, 0])
target_states = [
    PlanState(qpos=[1, 1, 1, 1, 1, 1])
]
result = motion_gen.plan(
    current_state=current_state,
    target_states=target_states,
    sample_method=TrajectorySampleMethod.TIME,
    sample_interval=0.01
)

Cartesian or Joint Interpolation

# Using joint configurations (qpos_list)
qpos_list = [
    [0, 0, 0, 0, 0, 0],
    [1, 1, 1, 1, 1, 1]
]
out_qpos_list, out_xpos_list = motion_gen.create_discrete_trajectory(
    qpos_list=qpos_list,
    is_linear=False,
    sample_method=TrajectorySampleMethod.QUANTITY,
    sample_num=20
)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +321 to +325
if xpos_list is None:
logger.log_warning("Either xpos_list or qpos_list must be provided")
empty_qpos = torch.empty((0, self.dof), dtype=torch.float32)
empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32)
return empty_qpos, empty_xpos
Comment on lines +114 to +116
current_state: PlanState
target_states: List of PlanState
cfg: Planner runtime configuration.
Comment on lines +224 to +226
for i in range(self.dofs):
label = f"Joint {i+1}" if b == 0 else ""
axs[0].plot(
Comment on lines 145 to 151
init_plan_state, target_plan_states = self.planner.interpolate_trajectory(
control_part=runtime_cfg.control_part,
xpos_list=xpos_list,
qpos_list=qpos_list,
cfg=runtime_cfg,
)

# Check if current position is significantly different from first waypoint
pos_diff = np.linalg.norm(current_xpos[:3, 3] - xpos_list[0][:3, 3])
rot_diff = np.linalg.norm(current_xpos[:3, :3] - xpos_list[0][:3, :3])

if pos_diff > 0.001 or rot_diff > 0.01:
xpos_list = np.concatenate(
[current_xpos[None, :, :], xpos_list], axis=0
)
if qpos_list is not None:
qpos_list = np.concatenate(
[current_qpos[None, :], qpos_list], axis=0
)

if qpos_seed is None and qpos_list is not None:
qpos_seed = qpos_list[0]

# Input validation
if len(xpos_list) < 2:
logger.log_warning("xpos_list must contain at least 2 points")
return [], []

# Calculate point allocations for interpolation
interpolated_point_allocations = calculate_point_allocations(
xpos_list, step_size=0.002, angle_step=np.pi / 90
)

# Generate trajectory
interpolate_qpos_list = []
if is_linear or qpos_list is None:
# Linear cartesian interpolation
for i in range(len(xpos_list) - 1):
interpolated_poses = interpolate_xpos(
xpos_list[i], xpos_list[i + 1], interpolated_point_allocations[i]
)
for xpos in interpolated_poses:
success, qpos = self.robot.compute_ik(
pose=xpos, joint_seed=qpos_seed, name=self.uid
)

if isinstance(success, torch.Tensor):
is_success = bool(success.all())
elif isinstance(success, np.ndarray):
is_success = bool(np.all(success))
elif isinstance(success, (list, tuple)):
is_success = all(success)
else:
is_success = bool(success)

if isinstance(qpos, torch.Tensor):
has_nan = torch.isnan(qpos).any().item()
else:
has_nan = np.isnan(qpos).any()

if not is_success or qpos is None or has_nan:
logger.log_debug(
f"IK failed or returned nan at pose, skipping this point."
)
continue

interpolate_qpos_list.append(
qpos[0] if isinstance(qpos, (np.ndarray, list)) else qpos
)
qpos_seed = (
qpos[0] if isinstance(qpos, (np.ndarray, list)) else qpos
)
else:
Comment on lines +38 to +49
from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg

# Assume you have a robot instance and arm_name
motion_cfg = MotionGenCfg(
planner_cfg=ToppraPlannerCfg(
robot_uid=robot.uid,
control_part=arm_name,
constraints={
"velocity": 0.2,
"acceleration": 0.5,
},
)
Comment on lines +44 to +48
assert self.planner.dofs == 6
assert len(self.planner.vlims) == 6
assert len(self.planner.alims) == 6
assert self.planner.device == torch.device("cpu")

Comment on lines +49 to +54
def test_plan_basic(self):
current_state = PlanState(qpos=np.zeros(6))
target_states = [PlanState(qpos=np.ones(6))]

result = self.planner.plan(current_state, target_states, sample_interval=0.1)
assert result.success is True
Comment on lines 53 to 66
# Plan a joint-space trajectory
current_state = {"position": [0, 0, 0, 0, 0, 0]}
target_states = [{"position": [0.5, 0.2, 0, 0, 0, 0]}]
success, positions, velocities, accelerations, times, duration = motion_gen.plan(
current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.]))
target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))]
plan_result = motion_gen.plan(
current_state=current_state,
target_states=target_states
)
success = plan_result.success
positions = plan_result.positions
velocities = plan_result.velocities
accelerations = plan_result.accelerations
times = plan_result.t_series
duration = plan_result.duration

Comment on lines +31 to +45
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod, PlanState
from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner
success, positions, velocities, accelerations, times, duration = planner.plan(
current_state={
"position": [0, 0, 0, 0, 0, 0],
"velocity": [0, 0, 0, 0, 0, 0],
"acceleration": [0, 0, 0, 0, 0, 0]
},
result = planner.plan(
current_state=PlanState(qpos=[0, 0, 0, 0, 0, 0]),
target_states=[
{"position": [1, 1, 1, 1, 1, 1]}
PlanState(qpos=[1, 1, 1, 1, 1, 1])
],
sample_method=TrajectorySampleMethod.TIME,
sample_interval=0.01
)
```

- `positions`, `velocities`, `accelerations` are arrays of sampled trajectory points.
- `times` is the array of time stamps.
- `duration` is the total trajectory time.
- `result.positions`, `result.velocities`, `result.accelerations` are arrays of sampled trajectory points.
- `result.dt` is the array of time stamps.
- `result.duration` is the total trajectory time.
Comment on lines +257 to +272
if self.dofs > 0 and has_constraints:
plot_idx = 1
if vels is not None:
max_vel = self.cfg.constraints["velocity"][0]
axs[plot_idx].plot(
time_steps,
[-max_vel] * len(time_steps),
"k--",
label="Max Velocity",
)
axs[plot_idx].plot(time_steps, [max_vel] * len(time_steps), "k--")
plot_idx += 1

if accs is not None:
max_acc = self.cfg.constraints["acceleration"][0]
axs[plot_idx].plot(
Copilot AI review requested due to automatic review settings March 19, 2026 06:42
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated 16 comments.

Comments suppressed due to low confidence (2)

docs/source/overview/sim/planners/motion_generator.md:116

  • The MotionGenerator doc still imports ToppraPlannerCfg from embodichain.lab.sim.planners.motion_generator, but that class is defined in toppra_planner (or re-exported from embodichain.lab.sim.planners). Also, the examples below still use the removed current_state argument and create_discrete_trajectory API. Please update imports and usage to the new plan(target_states, runtime_cfg=...) flow.
from embodichain.lab.sim.planners.motion_generator import MotionGenerator, MotionGenCfg, ToppraPlannerCfg
from embodichain.lab.sim.objects.robot import Robot
from embodichain.lab.sim.solvers.pink_solver import PinkSolverCfg
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod

# Configure the simulation
sim_cfg = SimulationManagerCfg(
    width=1920,
    height=1080,
    physics_dt=1.0 / 100.0,
    sim_device="cpu",
)

sim = SimulationManager(sim_cfg)

# Get UR10 URDF path
urdf_path = get_data_path("UniversalRobots/UR10/UR10.urdf")

# Create UR10 robot
robot_cfg = RobotCfg(
    uid="UR10_test",
    urdf_cfg=URDFCfg(
        components=[{"component_type": "arm", "urdf_path": urdf_path}]
    ),
    control_parts={"arm": ["Joint[1-6]"]},
    solver_cfg={
        "arm": PinkSolverCfg(
            urdf_path=urdf_path,
            end_link_name="ee_link",
            root_link_name="base_link",
            pos_eps=1e-2,
            rot_eps=5e-2,
            max_iterations=300,
            dt=0.1,
        )
    },
    drive_pros=JointDrivePropertiesCfg(
        stiffness={"Joint[1-6]": 1e4},
        damping={"Joint[1-6]": 1e3},
    ),
)
robot = sim.add_robot(cfg=robot_cfg)

motion_gen = MotionGenerator(
    cfg=MotionGenCfg(
        planner_cfg=ToppraPlannerCfg(
            robot_uid="UR10_test",
            control_part="arm",
            constraints={
                "velocity": 0.2,
                "acceleration": 0.5,
            },
        )
    )
)

Trajectory Planning

Joint Space Planning

from embodichain.lab.sim.planners.utils import PlanState

current_state = PlanState(qpos=[0, 0, 0, 0, 0, 0])
target_states = [
    PlanState(qpos=[1, 1, 1, 1, 1, 1])
]
result = motion_gen.plan(
    current_state=current_state,
    target_states=target_states,
    sample_method=TrajectorySampleMethod.TIME,
    sample_interval=0.01
)

Cartesian or Joint Interpolation

# Using joint configurations (qpos_list)
qpos_list = [
    [0, 0, 0, 0, 0, 0],
    [1, 1, 1, 1, 1, 1]
]
out_qpos_list, out_xpos_list = motion_gen.create_discrete_trajectory(
    qpos_list=qpos_list,
    is_linear=False,
    sample_method=TrajectorySampleMethod.QUANTITY,
    sample_num=20
)

docs/source/tutorial/motion_gen.rst:72

  • This section still documents the old MotionGenerator.plan(current_state=..., sample_method=..., sample_interval=...) and create_discrete_trajectory(...) APIs, and references plan_result.t_series (which doesn’t exist; PlanResult currently exposes dt). Since these APIs were removed/changed in this PR, the tutorial should be updated to the new plan(target_states, runtime_cfg=...) signature (and either remove create_discrete_trajectory usage or replace it with is_pre_interpolate=True).
   # Plan a joint-space trajectory
   current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.]))
   target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))]
   plan_result = motion_gen.plan(
       current_state=current_state,
       target_states=target_states
   )
   success = plan_result.success
   positions = plan_result.positions
   velocities = plan_result.velocities
   accelerations = plan_result.accelerations
   times = plan_result.t_series
   duration = plan_result.duration

   # Generate a discrete trajectory (joint or Cartesian)
   qpos_list, xpos_list = motion_gen.create_discrete_trajectory(
       qpos_list=[[0,0,0,0,0,0],[0.5,0.2,0,0,0,0]],
       sample_num=20
   )


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.


if xpos_list is None:
logger.log_warning("Either xpos_list or qpos_list must be provided")
empty_qpos = torch.empty((0, self.dof), dtype=torch.float32)
Comment on lines +296 to +350
def interpolate_trajectory(
self,
control_part: str | None = None,
xpos_list: torch.Tensor | None = None,
qpos_list: torch.Tensor | None = None,
cfg: BasePlannerRuntimeCfg = BasePlannerRuntimeCfg(),
) -> tuple[PlanState, list[PlanState]]:
if qpos_list is not None:
if not isinstance(qpos_list, torch.Tensor):
qpos_list = np.asarray(qpos_list)
qpos_tensor = torch.as_tensor(
qpos_list, dtype=torch.float32, device=self.robot.device
)
if qpos_tensor.dim() == 1:
qpos_tensor = qpos_tensor.unsqueeze(0)

qpos_batch = qpos_tensor.unsqueeze(0) # [n_env=1, n_batch=N, dof]
xpos_batch = self.robot.compute_batch_fk(
qpos=qpos_batch,
name=control_part,
to_matrix=True,
)
xpos_list = xpos_batch.squeeze(0)
qpos_list = qpos_tensor

if xpos_list is None:
logger.log_warning("Either xpos_list or qpos_list must be provided")
empty_qpos = torch.empty((0, self.dof), dtype=torch.float32)
empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32)
return empty_qpos, empty_xpos

if not isinstance(xpos_list, torch.Tensor):
xpos_list = torch.as_tensor(
np.asarray(xpos_list),
dtype=torch.float32,
device=self.robot.device,
)
else:
xpos_list = xpos_list.to(dtype=torch.float32, device=self.robot.device)

if cfg.start_qpos is not None:
start_xpos = self.robot.compute_fk(
qpos=cfg.start_qpos.unsqueeze(0), name=control_part, to_matrix=True
)
qpos_list = (
torch.cat([cfg.start_qpos.unsqueeze(0), qpos_list], dim=0)
if qpos_list is not None
else None
)
xpos_list = torch.cat([start_xpos, xpos_list], dim=0)
# Input validation
if len(xpos_list) < 2:
logger.log_warning("xpos_list must contain at least 2 points")
return None, None

Comment on lines +38 to +56
from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg

# Assume you have a robot instance and arm_name
motion_cfg = MotionGenCfg(
planner_cfg=ToppraPlannerCfg(
robot_uid=robot.uid,
control_part=arm_name,
constraints={
"velocity": 0.2,
"acceleration": 0.5,
},
)
)
motion_gen = MotionGenerator(cfg=motion_cfg)

# Plan a joint-space trajectory
current_state = {"position": [0, 0, 0, 0, 0, 0]}
target_states = [{"position": [0.5, 0.2, 0, 0, 0, 0]}]
success, positions, velocities, accelerations, times, duration = motion_gen.plan(
current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.]))
target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))]
plan_result = motion_gen.plan(
Comment on lines +353 to +366
xpos_list, step_size=0.002, angle_step=np.pi / 90, device=self.device
)
# Input validation
if len(xpos_list) < 2:
logger.log_warning("xpos_list must contain at least 2 points")
empty_qpos = torch.empty((0, self.dof), dtype=torch.float32)
empty_xpos = torch.empty((0, 4, 4), dtype=torch.float32)
return empty_qpos, empty_xpos

# Calculate point allocations for interpolation
interpolated_point_allocations = calculate_point_allocations(
xpos_list, step_size=0.002, angle_step=np.pi / 90
)

Comment on lines +257 to +272
if self.dofs > 0 and has_constraints:
plot_idx = 1
if vels is not None:
max_vel = self.cfg.constraints["velocity"][0]
axs[plot_idx].plot(
time_steps,
[-max_vel] * len(time_steps),
"k--",
label="Max Velocity",
)
axs[plot_idx].plot(time_steps, [max_vel] * len(time_steps), "k--")
plot_idx += 1

if accs is not None:
max_acc = self.cfg.constraints["acceleration"][0]
axs[plot_idx].plot(
Comment on lines 132 to 175
# Check waypoints
if len(current_state["position"]) != self.dofs:
logger.log_info("Current wayponit does not align")
return False, None, None, None, None, None
start_qpos = (
runtime_cfg.start_qpos
if runtime_cfg.start_qpos is not None
else target_states[0].qpos
)
if len(start_qpos) != dofs:
logger.log_error("Current waypoint does not align")
for target in target_states:
if len(target["position"]) != self.dofs:
logger.log_info("Target Wayponits does not align")
return False, None, None, None, None, None
if len(target.qpos) != dofs:
logger.log_error("Target waypoints do not align")

if (
len(target_states) == 1
and np.sum(
np.abs(
np.array(target_states[0]["position"])
- np.array(current_state["position"])
)
)
and np.sum(np.abs(np.array(target_states[0].qpos) - np.array(start_qpos)))
< 1e-3
):
logger.log_info("Only two same waypoints, do not plan")
return (
True,
np.array([current_state["position"], target_states[0]["position"]]),
np.array([[0.0] * self.dofs, [0.0] * self.dofs]),
np.array([[0.0] * self.dofs, [0.0] * self.dofs]),
0,
0,
logger.log_warning("Only two same waypoints, returning trivial trajectory.")
return PlanResult(
success=True,
positions=torch.as_tensor(
np.array([start_qpos, target_states[0].qpos]),
dtype=torch.float32,
device=self.device,
),
velocities=torch.as_tensor(
np.array([[0.0] * dofs, [0.0] * dofs]),
dtype=torch.float32,
device=self.device,
),
accelerations=torch.as_tensor(
np.array([[0.0] * dofs, [0.0] * dofs]),
dtype=torch.float32,
device=self.device,
),
dt=torch.as_tensor([0.0, 0.0], dtype=torch.float32, device=self.device),
duration=0.0,
)

# Build waypoints
waypoints = [np.array(current_state["position"])]
waypoints = [np.array(start_qpos)]
for target in target_states:
waypoints.append(np.array(target["position"]))
waypoints.append(np.array(target.qpos))
waypoints = np.array(waypoints)
Comment on lines 46 to 62
@@ -51,555 +61,225 @@ class MotionGenerator:
**kwargs: Additional arguments passed to planner initialization
"""
Comment on lines 279 to 284
estimated_num = self.motion_gen.estimate_trajectory_sample_count(
qpos_list=self.qpos_list,
qpos_list=torch.as_tensor(np.array(self.qpos_list)),
step_size=0.01,
angle_step=np.pi / 90,
control_part=self.arm_name,
)
Comment on lines 30 to +45
```python
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod, PlanState
from embodichain.lab.sim.planners.toppra_planner import ToppraPlanner
success, positions, velocities, accelerations, times, duration = planner.plan(
current_state={
"position": [0, 0, 0, 0, 0, 0],
"velocity": [0, 0, 0, 0, 0, 0],
"acceleration": [0, 0, 0, 0, 0, 0]
},
result = planner.plan(
current_state=PlanState(qpos=[0, 0, 0, 0, 0, 0]),
target_states=[
{"position": [1, 1, 1, 1, 1, 1]}
PlanState(qpos=[1, 1, 1, 1, 1, 1])
],
sample_method=TrajectorySampleMethod.TIME,
sample_interval=0.01
)
```

- `positions`, `velocities`, `accelerations` are arrays of sampled trajectory points.
- `times` is the array of time stamps.
- `duration` is the total trajectory time.
- `result.positions`, `result.velocities`, `result.accelerations` are arrays of sampled trajectory points.
- `result.dt` is the array of time stamps.
- `result.duration` is the total trajectory time.
Comment on lines +149 to 152
# Check bounds
vel_check = torch.all(torch.abs(vels) <= max_vel).item()
acc_check = torch.all(torch.abs(accs) <= max_acc).item()

Copilot AI review requested due to automatic review settings March 19, 2026 09:37
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 18 comments.

Comments suppressed due to low confidence (1)

docs/source/overview/sim/planners/motion_generator.md:116

  • The MotionGenerator overview still references removed/changed API (control_part/constraints inside ToppraPlannerCfg, motion_gen.plan(...), and create_discrete_trajectory(...)). Update the doc to use the new MotionGenerator.generate(...) flow with MotionGenOptions + ToppraPlanOptions and the updated PlanState fields.
from embodichain.lab.sim.planners.motion_generator import MotionGenerator, MotionGenCfg, ToppraPlannerCfg
from embodichain.lab.sim.objects.robot import Robot
from embodichain.lab.sim.solvers.pink_solver import PinkSolverCfg
from embodichain.lab.sim.planners.utils import TrajectorySampleMethod

# Configure the simulation
sim_cfg = SimulationManagerCfg(
    width=1920,
    height=1080,
    physics_dt=1.0 / 100.0,
    sim_device="cpu",
)

sim = SimulationManager(sim_cfg)

# Get UR10 URDF path
urdf_path = get_data_path("UniversalRobots/UR10/UR10.urdf")

# Create UR10 robot
robot_cfg = RobotCfg(
    uid="UR10_test",
    urdf_cfg=URDFCfg(
        components=[{"component_type": "arm", "urdf_path": urdf_path}]
    ),
    control_parts={"arm": ["Joint[1-6]"]},
    solver_cfg={
        "arm": PinkSolverCfg(
            urdf_path=urdf_path,
            end_link_name="ee_link",
            root_link_name="base_link",
            pos_eps=1e-2,
            rot_eps=5e-2,
            max_iterations=300,
            dt=0.1,
        )
    },
    drive_pros=JointDrivePropertiesCfg(
        stiffness={"Joint[1-6]": 1e4},
        damping={"Joint[1-6]": 1e3},
    ),
)
robot = sim.add_robot(cfg=robot_cfg)

motion_gen = MotionGenerator(
    cfg=MotionGenCfg(
        planner_cfg=ToppraPlannerCfg(
            robot_uid="UR10_test",
            control_part="arm",
            constraints={
                "velocity": 0.2,
                "acceleration": 0.5,
            },
        )
    )
)

Trajectory Planning

Joint Space Planning

from embodichain.lab.sim.planners.utils import PlanState

current_state = PlanState(qpos=[0, 0, 0, 0, 0, 0])
target_states = [
    PlanState(qpos=[1, 1, 1, 1, 1, 1])
]
result = motion_gen.plan(
    current_state=current_state,
    target_states=target_states,
    sample_method=TrajectorySampleMethod.TIME,
    sample_interval=0.01
)

Cartesian or Joint Interpolation

# Using joint configurations (qpos_list)
qpos_list = [
    [0, 0, 0, 0, 0, 0],
    [1, 1, 1, 1, 1, 1]
]
out_qpos_list, out_xpos_list = motion_gen.create_discrete_trajectory(
    qpos_list=qpos_list,
    is_linear=False,
    sample_method=TrajectorySampleMethod.QUANTITY,
    sample_num=20
)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

0,
)
logger.log_warning("Only two same waypoints, returning trivial trajectory.")
return target_states
Comment on lines 282 to 295
if xpos_or_qpos == "xpos":
estimated_num = self.motion_gen.estimate_trajectory_sample_count(
xpos_list=self.xpos_list,
xpos_list=torch.as_tensor(np.array(self.xpos_list)),
step_size=0.01,
angle_step=np.pi / 90,
control_part=self.arm_name,
)
else:
estimated_num = self.motion_gen.estimate_trajectory_sample_count(
qpos_list=self.qpos_list,
qpos_list=torch.as_tensor(np.array(self.qpos_list)),
step_size=0.01,
angle_step=np.pi / 90,
control_part=self.arm_name,
)
Comment on lines +135 to +139
def generate(
self,
target_states: List[PlanState],
options: MotionGenOptions = MotionGenOptions(),
) -> PlanResult:
Comment on lines +94 to +99
@validate_plan_options(options_cls=ToppraPlanOptions)
def plan(
self,
current_state: dict,
target_states: list[dict],
**kwargs,
):
target_states: list[PlanState],
options: ToppraPlanOptions = ToppraPlanOptions(),
) -> PlanResult:
Comment on lines +116 to +131
# set constraints
if isinstance(options.constraints["velocity"], float):
vlims = np.array(
[
[
-options.constraints["velocity"],
options.constraints["velocity"],
]
for _ in range(dofs)
]
)
else:
vlims = np.array(options.constraints["velocity"])

if isinstance(options.constraints["acceleration"], float):
alims = np.array(
Comment on lines +38 to 66
from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg

# Assume you have a robot instance and arm_name
motion_cfg = MotionGenCfg(
planner_cfg=ToppraPlannerCfg(
robot_uid=robot.uid,
control_part=arm_name,
constraints={
"velocity": 0.2,
"acceleration": 0.5,
},
)
)
motion_gen = MotionGenerator(cfg=motion_cfg)

# Plan a joint-space trajectory
current_state = {"position": [0, 0, 0, 0, 0, 0]}
target_states = [{"position": [0.5, 0.2, 0, 0, 0, 0]}]
success, positions, velocities, accelerations, times, duration = motion_gen.plan(
current_state = PlanState(position=np.array([0., 0., 0., 0., 0., 0.]))
target_states = [PlanState(position=np.array([0.5, 0.2, 0., 0., 0., 0.]))]
plan_result = motion_gen.plan(
current_state=current_state,
target_states=target_states
)
success = plan_result.success
positions = plan_result.positions
velocities = plan_result.velocities
accelerations = plan_result.accelerations
times = plan_result.t_series
duration = plan_result.duration

Comment on lines +217 to 220
select_qpos_traj.extend(ret.positions.numpy())
ee_state_list_select.extend([select_arm_current_gripper_state] * len(ret.positions))


Comment on lines +204 to 205
return ret.positions.numpy().T

Comment on lines +165 to +171
@pytest.mark.skip(reason="Skipping CUDA tests temporarily")
class TestCPU(EmbodiedEnvTest):
def setup_method(self):
self.setup_simulation("cpu", enable_rt=False)


@pytest.mark.skip(reason="Skipping CUDA tests temporarily")
Comment on lines 157 to +176
Note:
- Allows 10% tolerance for velocity constraints
- Allows 25% tolerance for acceleration constraints
- Prints exceed information if constraints are violated
- Assumes symmetric constraints (velocities and accelerations can be positive or negative)
- Supports batch dimension computation, e.g. (B, N, DOF) or (N, DOF)
"""
# Convert max_constraints to symmetric format for constraint checking
# This assumes symmetric constraints (common for most planners)
vlims = np.array([[-v, v] for v in self.max_constraints["velocity"]])
alims = np.array([[-a, a] for a in self.max_constraints["acceleration"]])

vel_check = np.all((velocities >= vlims[:, 0]) & (velocities <= vlims[:, 1]))
acc_check = np.all(
(accelerations >= alims[:, 0]) & (accelerations <= alims[:, 1])
device = vels.device

max_vel = torch.tensor(constraints["velocity"], dtype=vels.dtype, device=device)
max_acc = torch.tensor(
constraints["acceleration"], dtype=accs.dtype, device=device
)

# 超限情况
# To support batching, we compute along all dimensions except the last one (DOF)
reduce_dims = tuple(range(vels.ndim - 1))

# Check bounds
vel_check = torch.all(torch.abs(vels) <= max_vel).item()
acc_check = torch.all(torch.abs(accs) <= max_acc).item()
@yuecideng yuecideng merged commit 8b45c1d into main Mar 19, 2026
5 checks passed
@yuecideng yuecideng deleted the cj/update-base-planner branch March 19, 2026 11:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants