1+ """Hardware abstraction layer for the UR5e robot.
2+ Starts a Thread with Multiprocessing to control the robot via RTDE interface in real-time.
3+ Uses shared memory to communicate between the main process and the control process."""
4+
15import multiprocessing as mp
26import time
37import typing
48from dataclasses import dataclass
59from multiprocessing .shared_memory import SharedMemory
6-
710import numpy as np
11+
812import rtde_control
913import rtde_receive
10- from rcs_ur5e import robotiq_gripper
1114
15+ from rcs_ur5e import robotiq_gripper
1216from rcs import common
1317
18+ from enum import IntEnum
19+
20+
21+
1422
1523@dataclass (kw_only = True )
1624class UR5eConfig (common .RobotConfig ):
17- lookahead_time : float = 0.05
18- gain : float = 500.0
25+ # Robot movement parameters
1926 max_velocity : float = 1.0
2027 max_acceleration : float = 1.0
2128 async_control : bool = True
2229 max_servo_joint_step : float = 0.15
2330 max_servo_cartesian_step : float = 0.01
2431
32+ # UR Controller parameters, change with caution
33+ lookahead_time : float = 0.05
34+ gain : float = 500.0
35+
2536 def __post_init__ (self ):
2637 super ().__init__ ()
2738
@@ -32,44 +43,39 @@ def __post_init__(self):
3243 super ().__init__ ()
3344
3445
35- # Define the shared memory size and name outside the class
46+ # Define the shared memory
3647SHM_SIZE = 4 + 1 + 48 + 48 + 48 + 48
3748SHM_NAME = "ur5e_control_shm"
3849
39- NO_MODE = 0
40- JOINT_MODE = 1
41- CARTESIAN_MODE = 2
50+ class ControlMode (IntEnum ):
51+ IDLE = 0
52+ JOINT_MODE = 1
53+ CARTESIAN_MODE = 2
4254
4355
4456def _control_robot (shm_name : str , ip : str , stop_queue : mp .Queue , config_queue : mp .Queue ) -> None :
4557 """
4658 Control loop for the robot, running in a separate process.
47- This function is a helper and not part of the class.
4859 """
4960 robot_config = UR5eConfig ()
5061 try :
5162 # Initialize robot interfaces
52- # TODO(j.hechtl): this is currently blocking if connection fails
5363 ur_control = rtde_control .RTDEControlInterface (ip )
5464 ur_receive = rtde_receive .RTDEReceiveInterface (ip )
5565
5666 if not ur_control .isConnected ():
5767 msg = f"Could not connect to UR5e at { ip } ."
5868 raise ConnectionError (msg )
5969
60- # Attach to the shared memory segment
70+ # Setup Shared Memory
6171 shm = SharedMemory (name = shm_name )
6272 data_buffer = shm .buf
63-
64- # Define offsets for each field in the shared memory buffer
6573 offset_mode = 0
6674 offset_target_reached = offset_mode + 4
6775 offset_joint_target = offset_target_reached + 1
6876 offset_cartesian_target = offset_joint_target + 48
6977 offset_joint_state = offset_cartesian_target + 48
7078 offset_cartesian_state = offset_joint_state + 48
71-
72- # Create numpy views on the shared memory buffer
7379 joint_target_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_joint_target )
7480 cartesian_target_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_cartesian_target )
7581 joint_state_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_joint_state )
@@ -91,7 +97,7 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
9197 cartesian_state = np .array (ur_receive .getActualTCPPose ())
9298 cartesian_state_view [:] = cartesian_state
9399
94- if mode == JOINT_MODE :
100+ if mode == ControlMode . JOINT_MODE :
95101 diff = joint_target_view - joint_state_view
96102 if np .max (np .abs (diff )) < 0.01 :
97103 data_buffer [offset_target_reached ] = 1
@@ -102,25 +108,6 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
102108 else :
103109 target_q = joint_target_view .tolist ()
104110
105- # current_qd = ur_receive.getActualQd()
106- # required_qd = diff / dt
107- # required_qdd = (required_qd - current_qd) / dt
108-
109- # max_qdd = [1,1,1,1,1,1]
110-
111- # factor = 1
112-
113- # for i in range(len(max_qdd)):
114- # if np.abs(required_qdd[i])*factor > max_qdd[i]:
115- # factor = factor * (max_qdd[i]/np.abs(required_qdd))
116- # assert factor <= 1
117- # if factor < 1:
118- # target_qdd = factor * required_qdd
119- # target_qd = current_qd + 0.5 * dt *target_qdd # average velocity in time window
120- # new_target_q = joint_state + target_qd * dt
121- # print("Current q: ", joint_state, " Target q: ", target_q, " New: ", new_target_q)
122-
123- # print("Servo")
124111 ur_control .servoJ (
125112 target_q ,
126113 robot_config .max_velocity ,
@@ -130,7 +117,7 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
130117 robot_config .gain ,
131118 )
132119
133- elif mode == CARTESIAN_MODE :
120+ elif mode == ControlMode . CARTESIAN_MODE :
134121 rotvec = common .RotVec (np .array (cartesian_target_view [3 :6 ]))
135122 # print("Target View: ", cartesian_target_view)
136123 a = common .Pose (quaternion = rotvec .as_quaternion_vector (), translation = cartesian_target_view [:3 ])
@@ -190,15 +177,12 @@ def __init__(self, ip: str):
190177 self ._shm_buffer = self ._shm .buf
191178 self ._stop_queue = mp .Queue ()
192179 self ._config_queue = mp .Queue ()
193-
194- # Define numpy views on the shared memory buffer
195180 self ._offset_mode = 0
196181 self ._offset_target_reached = self ._offset_mode + 4
197182 self ._offset_joint_target = self ._offset_target_reached + 1
198183 self ._offset_cartesian_target = self ._offset_joint_target + 48
199184 self ._offset_joint_state = self ._offset_cartesian_target + 48
200185 self ._offset_cartesian_state = self ._offset_joint_state + 48
201-
202186 self ._joint_target_shm = np .ndarray (
203187 (6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_joint_target
204188 )
@@ -212,7 +196,7 @@ def __init__(self, ip: str):
212196 (6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_cartesian_state
213197 )
214198
215- # Initialise with -10 to check
199+ # Initialise with -10 to check for first value
216200 self ._joint_state_shm [:] = - 10
217201
218202 # Start the robot control process
@@ -222,7 +206,6 @@ def __init__(self, ip: str):
222206 self ._robot_process .daemon = True # Kills process if main process exits
223207 self ._robot_process .start ()
224208
225- # Check for first update
226209 while self ._joint_state_shm [0 ] == - 10 :
227210 print ("Waiting for first robot state to arrive.." )
228211 time .sleep (1 )
@@ -251,18 +234,17 @@ def get_cartesian_position(self) -> common.Pose:
251234 return common .Pose (rpy_vector = [0 , 0 , np .deg2rad (180 )], translation = [0 , 0 , 0 ]).inverse () * pose
252235
253236 def get_ik (self ) -> common .IK | None :
254-
255237 return None
256238
257239 def get_joint_position (self ) -> np .ndarray [tuple [typing .Literal [6 ]], np .dtype [np .float64 ]]:
258240 return np .array (self ._joint_state_shm )
259241
260- def get_parameters (self ) -> UR5eConfig :
242+ def get_config (self ) -> UR5eConfig :
261243 return self ._config
262244
263- def set_parameters (self , robot_cfg : UR5eConfig ) -> None :
245+ def set_config (self , robot_cfg : UR5eConfig ) -> None :
264246 self ._config = robot_cfg
265- # self._config_queue.put(robot_cfg)
247+ # self._config_queue.put(robot_cfg) #TODO(j.hechtl): uncomment this
266248
267249 def get_state (self ) -> UR5eState :
268250 return UR5eState
@@ -273,19 +255,19 @@ def set_cartesian_position(self, pose: common.Pose) -> None:
273255 print ("IK failed" )
274256 return
275257 self .set_joint_position (q )
276- return
258+ return # TODO(j.hechtl)
277259 self ._shm_buffer [self ._offset_target_reached ] = 0
278260 target_pose = (common .Pose (rpy_vector = [0 , 0 , np .deg2rad (180 )], translation = [0 , 0 , 0 ]) * pose ).rotvec ()
279261 self ._cartesian_target_shm [:] = target_pose
280- self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (CARTESIAN_MODE ).to_bytes (4 , "little" )
262+ self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (ControlMode . CARTESIAN_MODE ).to_bytes (4 , "little" )
281263 if not self ._config .async_control :
282264 while not self ._shm_buffer [self ._offset_target_reached ]:
283265 time .sleep (0.01 )
284266
285267 def set_joint_position (self , q : np .ndarray [tuple [typing .Literal [6 ]], np .dtype [np .float64 ]]) -> None :
286268 self ._shm_buffer [self ._offset_target_reached ] = 0
287269 self ._joint_target_shm [:] = q
288- self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (JOINT_MODE ).to_bytes (4 , "little" )
270+ self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (ControlMode . JOINT_MODE ).to_bytes (4 , "little" )
289271 if not self ._config .async_control :
290272 while not self ._shm_buffer [self ._offset_target_reached ]:
291273 time .sleep (0.01 )
@@ -300,7 +282,7 @@ def move_home(self) -> None:
300282 raise ValueError (msg )
301283 print (f"Moving to home position: { home } " )
302284 self ._joint_target_shm [:] = home
303- self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (JOINT_MODE ).to_bytes (4 , "little" )
285+ self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (ControlMode . JOINT_MODE ).to_bytes (4 , "little" )
304286 while not self ._shm_buffer [self ._offset_target_reached ]:
305287 time .sleep (0.01 )
306288
0 commit comments