From fadfcdf8adb3bf689c6b8d84f8c504e14396b7bd Mon Sep 17 00:00:00 2001 From: jossiewang Date: Mon, 31 Mar 2025 17:20:28 +0800 Subject: [PATCH 01/15] Modified: (lidar) beacon sequence in geometry --- .../lidar_member_function.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 089e3e3..53721b9 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -71,7 +71,7 @@ def __init__(self): # ros debug logger self.get_logger().debug('Lidar Localization Node has been initialized') - self.init_landmarks_map(self.landmarks_map) + self.init_landmarks_map() self.robot_pose = [] self.P_pred = [] self.newPose = False @@ -129,22 +129,26 @@ def set_lidar_side_callback(self, msg): np.array([3.094, 1.948]), np.array([-0.094, 1.0]) ] - self.init_landmarks_map(self.landmarks_map) + self.init_landmarks_map() self.get_logger().debug(f"Set lidar side to {self.side}") else: self.get_logger().warn("Invalid side value") - def init_landmarks_map(self, landmarks_map): - self.landmarks_map = landmarks_map + def init_landmarks_map(self): # calculate the geometry description of landmarks self.geometry_description_map = {} - NUM_LANDMARKS = len(landmarks_map) + NUM_LANDMARKS = len(self.landmarks_map) for i in range(NUM_LANDMARKS): for j in range(i + 1, NUM_LANDMARKS): if i == j: continue - d_ij = np.linalg.norm(landmarks_map[i] - landmarks_map[j]) + d_ij = np.linalg.norm(self.landmarks_map[i] - self.landmarks_map[j]) self.geometry_description_map[(i, j)] = d_ij + # calculate the landmark sequence, mark clockwise or counter-clockwise (clockwise for yellow, counter-clockwise for blue) + if np.cross(self.landmarks_map[1] - self.landmarks_map[0], self.landmarks_map[2] - self.landmarks_map[0]) > 0: + self.geometry_description_map[(3,3)] = 1 # blue + else: + self.geometry_description_map[(3,3)] = -1 # yellow def clear_data(self): self.obs_raw = [] From a1079b72f654eaac8be67a63d9142e43a28a3806 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Tue, 1 Apr 2025 07:30:03 +0800 Subject: [PATCH 02/15] Modified: (lidar) __init__ code organized --- .../lidar_localization_pkg/lidar_member_function.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 53721b9..ddca6fe 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -13,12 +13,15 @@ class LidarLocalization(Node): # inherit from Node def __init__(self): super().__init__('lidar_localization_node') - # Declare parameters + # Declare mode parameters self.declare_parameter('side', 0) self.declare_parameter('debug_mode', False) self.declare_parameter('visualize_candidate', True) + + # Declare algorithm parameters self.declare_parameter('likelihood_threshold', 0.001) self.declare_parameter('consistency_threshold', 0.9) + self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix # Get parameters self.side = self.get_parameter('side').get_parameter_value().integer_value @@ -26,6 +29,7 @@ def __init__(self): self.visualize_candidate = self.get_parameter('visualize_candidate').get_parameter_value().bool_value self.likelihood_threshold = self.get_parameter('likelihood_threshold').get_parameter_value().double_value self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value + self.R = np.array(self.get_parameter('R').get_parameter_value().double_array_value) # Set the landmarks map based on the side if self.side == 0: @@ -46,8 +50,10 @@ def __init__(self): # ros settings self.lidar_pose_pub = self.create_publisher(PoseWithCovarianceStamped, 'lidar_pose', 10) + if self.visualize_candidate: self.circles_pub = self.create_publisher(MarkerArray, 'candidates', 10) + self.subscription = self.create_subscription( Obstacles, 'raw_obstacles', @@ -59,7 +65,6 @@ def __init__(self): self.pred_pose_callback, 10 ) - # subscribe to set_lidar_side topic self.subscription = self.create_subscription( String, 'set_lidar_side', @@ -68,14 +73,12 @@ def __init__(self): ) self.subscription # prevent unused variable warning - # ros debug logger - self.get_logger().debug('Lidar Localization Node has been initialized') + self.get_logger().debug('Lidar Localization Node initialized') self.init_landmarks_map() self.robot_pose = [] self.P_pred = [] self.newPose = False - self.R = np.array([[0.05**2, 0.0], [0.0, 0.05**2]]) # R: measurement noise; TODO: tune the value self.lidar_pose_msg = PoseWithCovarianceStamped() def obstacle_callback(self, msg): From 8249c8510bfb283145554cd6502ed6e07a00fab4 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Tue, 1 Apr 2025 07:33:26 +0800 Subject: [PATCH 03/15] Modified: (lidar) rm unnecessary variable passing --- .../lidar_member_function.py | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index ddca6fe..5d6d243 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -83,29 +83,32 @@ def __init__(self): def obstacle_callback(self, msg): self.get_logger().debug('obstacle detected') - # obstacle operation + self.obs_time = msg.header.stamp + self.obs_raw = [] for obs in msg.circles: self.obs_raw.append(np.array([obs.center.x, obs.center.y])) - self.obs_time = msg.header.stamp - # data processing - if self.newPose == False: # Check if robot_pose or P_pred is empty + + # main: data processing + if self.newPose == False: # Check for new robot pose self.get_logger().debug("no new robot pose or P_pred") return - self.landmarks_candidate = self.get_landmarks_candidate(self.landmarks_map, self.obs_raw) - self.landmarks_set = self.get_landmarks_set(self.landmarks_candidate) + + self.landmarks_candidate = self.get_landmarks_candidate() + + self.landmarks_set = self.get_landmarks_set() if len(self.landmarks_set) == 0: self.get_logger().debug("empty landmarks set") return - self.get_lidar_pose(self.landmarks_set, self.landmarks_map) + self.get_lidar_pose() + # clear used data self.clear_data() def pred_pose_callback(self, msg): - # self.get_logger().debug("Robot pose callback triggered") self.newPose = True - orientation = euler_from_quaternion(msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) # raw, pitch, *yaw - # check orientation range + orientation = yaw_from_quaternion(msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) # raw, pitch, *yaw + # check orientation range TODO: should be -pi to pi?? if orientation < 0: orientation += 2 * np.pi self.robot_pose = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation]) @@ -161,14 +164,13 @@ def clear_data(self): self.landmarks_set = [] self.newPose = False - def get_obs_candidate(self, landmark, obs_raw): + def get_obs_candidate(self, landmark): obs_candidates = [] x_r, y_r, phi_r = self.robot_pose if self.debug_mode: print(f"Robot pose from TF: {self.robot_pose}") x_o, y_o = landmark r_prime = np.sqrt((x_o - x_r) ** 2 + (y_o - y_r) ** 2) - # theta_rob = np.arctan2( temp = angle_limit_checking(np.arctan2(y_o - y_r, x_o - x_r)) # limit checking is not necessary right? theta_prime = angle_limit_checking(temp - phi_r) @@ -178,12 +180,12 @@ def get_obs_candidate(self, landmark, obs_raw): ]) S = H @ self.P_pred @ H.T + self.R S_inv = np.linalg.inv(S) - S_det = np.linalg.det(S) - marker_id = 0 - marker_array = MarkerArray() + if self.visualize_candidate: + marker_id = 0 + marker_array = MarkerArray() - for obs in obs_raw: + for obs in self.obs_raw: r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) theta_z = np.arctan2(obs[1], obs[0]) y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)]) @@ -233,14 +235,15 @@ def get_obs_candidate(self, landmark, obs_raw): return obs_candidates - def get_landmarks_candidate(self, landmarks_map, obs_raw): + def get_landmarks_candidate(self): + landmarks_map = self.landmarks_map landmarks_candidate = [] self.beacon_no = 0 for landmark in landmarks_map: self.beacon_no += 1 candidate = { 'landmark': landmark, - 'obs_candidates': self.get_obs_candidate(landmark, obs_raw) + 'obs_candidates': self.get_obs_candidate(landmark) } landmarks_candidate.append(candidate) # print landmarks_candidate for debug @@ -251,7 +254,8 @@ def get_landmarks_candidate(self, landmarks_map, obs_raw): print(f"Obs {j + 1}: {obs_candidate['position']} with probability {obs_candidate['probability']}") return landmarks_candidate - def get_landmarks_set(self, landmarks_candidate): + def get_landmarks_set(self): + landmarks_candidate = self.landmarks_candidate landmarks_set = [] for i in range(len(landmarks_candidate[0]['obs_candidates'])): for j in range(len(landmarks_candidate[1]['obs_candidates'])): @@ -281,11 +285,13 @@ def get_landmarks_set(self, landmarks_candidate): return landmarks_set - def get_lidar_pose(self, landmarks_set, landmarks_map): + def get_lidar_pose(self): + landmarks_map = self.landmarks_map + landmarks_set = self.landmarks_set if not landmarks_set: raise ValueError("landmarks_set is empty") # prefer the set with more beacons - landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True) + # landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True) # activate this if there's one more beacon # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort? max_likelihood = max(set['probability_set'] for set in landmarks_set) max_likelihood_idx = next(i for i, set in enumerate(landmarks_set) if set['probability_set'] == max_likelihood) @@ -432,14 +438,7 @@ def quaternion_from_euler(ai, aj, ak): q[3] = cj * cc + sj * ss return q -def euler_from_quaternion(x, y, z, w): - t0 = +2.0 * (w * x + y * z) - t1 = +1.0 - 2.0 * (x * x + y * y) - roll_x = np.arctan2(t0, t1) - t2 = +2.0 * (w * y - z * x) - t2 = +1.0 if t2 > +1.0 else t2 - t2 = -1.0 if t2 < -1.0 else t2 - pitch_y = np.arcsin(t2) +def yaw_from_quaternion(x, y, z, w): t3 = +2.0 * (w * z + x * y) t4 = +1.0 - 2.0 * (y * y + z * z) yaw_z = np.arctan2(t3, t4) From bfa896748df33b759e70b45565738c9bcdcb8e79 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Thu, 10 Apr 2025 23:22:43 +0800 Subject: [PATCH 04/15] Add: lidar update with two beacons (untested) --- .../lidar_member_function.py | 123 ++++++++++++------ 1 file changed, 84 insertions(+), 39 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 5d6d243..172b07b 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -20,7 +20,9 @@ def __init__(self): # Declare algorithm parameters self.declare_parameter('likelihood_threshold', 0.001) + self.declare_parameter('likelihood_threshold_two', 0.1) self.declare_parameter('consistency_threshold', 0.9) + self.declare_parameter('consistency_threshold_two', 0.99) self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix # Get parameters @@ -28,7 +30,9 @@ def __init__(self): self.debug_mode = self.get_parameter('debug_mode').get_parameter_value().bool_value self.visualize_candidate = self.get_parameter('visualize_candidate').get_parameter_value().bool_value self.likelihood_threshold = self.get_parameter('likelihood_threshold').get_parameter_value().double_value + self.likelihood_threshold = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value + self.consistency_threshold = self.get_parameter('consistency_threshold_two').get_parameter_value().double_value self.R = np.array(self.get_parameter('R').get_parameter_value().double_array_value) # Set the landmarks map based on the side @@ -193,45 +197,6 @@ def get_obs_candidate(self, landmark): likelihood = np.exp(-0.5 * di_square) if likelihood > self.likelihood_threshold: obs_candidates.append({'position': obs, 'probability': likelihood}) - # if self.visualize_candidate and self.beacon_no == 1: - # marker = Marker() - # marker.header.frame_id = "robot_predict" - # marker.header.stamp = self.get_clock().now().to_msg() - # marker.ns = "candidates" - # marker.type = Marker.SPHERE - # marker.action = Marker.ADD - # marker.scale.x = 0.1 - # marker.scale.y = 0.1 - # marker.scale.z = 0.01 - - # text_marker = Marker() - # text_marker.header.frame_id = "robot_predict" - # text_marker.header.stamp = self.get_clock().now().to_msg() - # text_marker.ns = "text" - # text_marker.type = Marker.TEXT_VIEW_FACING - # text_marker.action = Marker.ADD - # text_marker.scale.z = 0.1 - # text_marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0) # White text - - # # use visualization_msgs to visualize the likelihood - # marker.pose.position.x = obs[0] - # marker.pose.position.y = obs[1] - # marker.pose.position.z = 0.0 - # marker.color = ColorRGBA(r=0.0, g=0.5, b=1.0, a=likelihood) - # marker_id += 1 - # marker.id = marker_id - # marker_array.markers.append(marker) - # text_marker.pose.position.x = obs[0] - # text_marker.pose.position.y = obs[1] - # text_marker.pose.position.z = 0.1 - # text_marker.text = f"{likelihood:.2f}" - # text_marker.id = marker_id - # marker_array.markers.append(text_marker) - # if self.visualize_candidate and self.beacon_no == 1: - # self.circles_pub.publish(marker_array) - # self.get_logger().debug("Published marker array") - # # clean up - # marker_array.markers.clear() return obs_candidates @@ -257,6 +222,13 @@ def get_landmarks_candidate(self): def get_landmarks_set(self): landmarks_candidate = self.landmarks_candidate landmarks_set = [] + landmarks_with_candidates = 0 + for i in 3: + if len(landmarks_candidate[i]['obs_candidates']) > 0: + landmarks_with_candidates += 1 + if landmarks_with_candidates < 3: + self.get_two_beacons() + return for i in range(len(landmarks_candidate[0]['obs_candidates'])): for j in range(len(landmarks_candidate[1]['obs_candidates'])): for k in range(len(landmarks_candidate[2]['obs_candidates'])): @@ -416,6 +388,79 @@ def get_geometry_consistency(self, beacons): consistency = 0 return consistency + + def get_two_beacons(self): + two_index = [] + for i in 3: + if len(self.landmarks_candidate[i]['obs_candidates']) > 0: + two_index.append(i) + if len(two_index) < 2: + self.get_logger().debug("not enough beacons") + return + # check geometry consistency + nominal_distance = np.linalg.norm(self.landmarks_map[two_index[0]] - self.landmarks_map[two_index[1]]) + for i in range(len(self.landmarks_candidate[two_index[0]]['obs_candidates'])): + for j in range(len(self.landmarks_candidate[two_index[1]]['obs_candidates'])): + set = { + 'beacons': { + 0: self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['position'], + 1: self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['position'] + } + } + # consistency of the set + set['consistency'] = nominal_distance / np.linalg.norm(set['beacons'][0] - set['beacons'][1]) + if set['consistency'] < self.consistency_threshold_two: + self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold}: {set['consistency']}") + continue + # probability of the set + set['probability_set'] = self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['probability'] * self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['probability'] + if set['probability_set'] < self.likelihood_threshold_two: + self.get_logger().debug(f"Probability is less than {self.likelihood_threshold}: {set['probability_set']}") + continue + self.landmarks_set.append(set) + # print landmarks_set for debug + if self.debug_mode: + for i, set in enumerate(self.landmarks_set): + print(f"Set {i + 1}:") + print(f"Probability: {set['probability_set']}") + print(f"Geometry Consistency: {set['consistency']}") + + # use the set with the highest probability_set + max_likelihood = max(set['probability_set'] for set in self.landmarks_set) + max_likelihood_idx = next(i for i, set in enumerate(self.landmarks_set) if set['probability_set'] == max_likelihood) + beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][i] for i in range(2)] + # calculate the lidar pose (TODO) + # take the average of the position given by the two beacons + pose_1 = self.landmarks_map[two_index[0]] - self.landmarks_set[max_likelihood_idx]['beacons'][0] + pose_2 = self.landmarks_map[two_index[1]] - self.landmarks_set[max_likelihood_idx]['beacons'][1] + lidar_pose = (pose_1 + pose_2) / 2 + lidar_pose[2] = angle_limit_checking(np.arctan2(pose_1[1], pose_1[0]) - np.arctan2(beacons[0][1], beacons[0][0])) + lidar_cov = np.diag([0.05**2, 0.05**2, 0.05**2]) # what should the optimal value be? + lidar_cov[0, 0] /= max_likelihood + lidar_cov[1, 1] /= max_likelihood + lidar_cov[2, 2] /= max_likelihood + # publish the lidar pose + self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() # TODO: compensation + self.lidar_pose_msg.header.frame_id = 'map' #TODO: param + self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] + self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] + self.lidar_pose_msg.pose.pose.position.z = 0.0 + self.lidar_pose_msg.pose.pose.orientation.x = 0.0 + self.lidar_pose_msg.pose.pose.orientation.y = 0.0 + self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2) + self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2) + self.lidar_pose_msg.pose.covariance = [ + lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] + ] + # self.get_logger().debug(f"lidar_pose: {lidar_pose}") + self.lidar_pose_pub.publish(self.lidar_pose_msg) + # self.get_logger().debug("Published lidar_pose message") + def quaternion_from_euler(ai, aj, ak): ai /= 2.0 From 7ce246febf45bd4b710a2cc68866e763bff885e4 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Thu, 17 Apr 2025 19:46:15 +0800 Subject: [PATCH 05/15] Fixed: (lidar) param name for two_beacon update --- .../lidar_member_function.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index f572f73..b6139b6 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -20,9 +20,9 @@ def __init__(self): # Declare algorithm parameters self.declare_parameter('likelihood_threshold', 0.001) - self.declare_parameter('likelihood_threshold_two', 0.1) + self.declare_parameter('likelihood_threshold_two', 0.01) self.declare_parameter('consistency_threshold', 0.9) - self.declare_parameter('consistency_threshold_two', 0.99) + self.declare_parameter('consistency_threshold_two', 0.95) self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix # Get parameters @@ -30,9 +30,9 @@ def __init__(self): self.debug_mode = self.get_parameter('debug_mode').get_parameter_value().bool_value self.visualize_candidate = self.get_parameter('visualize_candidate').get_parameter_value().bool_value self.likelihood_threshold = self.get_parameter('likelihood_threshold').get_parameter_value().double_value - self.likelihood_threshold = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value + self.likelihood_threshold_two = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value - self.consistency_threshold = self.get_parameter('consistency_threshold_two').get_parameter_value().double_value + self.consistency_threshold_two = self.get_parameter('consistency_threshold_two').get_parameter_value().double_value self.R = np.array(self.get_parameter('R').get_parameter_value().double_array_value) self.adjust_factor = 1 @@ -187,9 +187,6 @@ def get_obs_candidate(self, landmark): S = H @ self.P_pred @ H.T + self.R S_inv = np.linalg.inv(S) - if self.visualize_candidate: - marker_id = 0 - marker_array = MarkerArray() for obs in self.obs_raw: r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) @@ -444,9 +441,10 @@ def get_two_beacons(self): beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][i] for i in range(2)] # calculate the lidar pose (TODO) # take the average of the position given by the two beacons + lidar_pose = np.zeros(3) # Ensure lidar_pose has three elements pose_1 = self.landmarks_map[two_index[0]] - self.landmarks_set[max_likelihood_idx]['beacons'][0] pose_2 = self.landmarks_map[two_index[1]] - self.landmarks_set[max_likelihood_idx]['beacons'][1] - lidar_pose = (pose_1 + pose_2) / 2 + lidar_pose[:2] = (pose_1 + pose_2) / 2 # Assign x and y to the first two elements lidar_pose[2] = angle_limit_checking(np.arctan2(pose_1[1], pose_1[0]) - np.arctan2(beacons[0][1], beacons[0][0])) lidar_cov = np.diag([0.05**2, 0.05**2, 0.05**2]) # what should the optimal value be? lidar_cov[0, 0] /= max_likelihood From 79250ea71d6f8ca47f177305ee6fe81cd7441d49 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Fri, 18 Apr 2025 14:37:07 +0800 Subject: [PATCH 06/15] Modified: (lidar) two_beacons with TF --- .../lidar_member_function.py | 154 ++++++++++++++---- 1 file changed, 119 insertions(+), 35 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index b6139b6..7b89a91 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -1,6 +1,8 @@ import rclpy from rclpy.node import Node +from tf2_ros import Buffer, TransformListener, LookupException, ConnectivityException, ExtrapolationException + from geometry_msgs.msg import PoseWithCovarianceStamped from obstacle_detector.msg import Obstacles from visualization_msgs.msg import Marker, MarkerArray @@ -20,9 +22,11 @@ def __init__(self): # Declare algorithm parameters self.declare_parameter('likelihood_threshold', 0.001) - self.declare_parameter('likelihood_threshold_two', 0.01) + self.declare_parameter('likelihood_threshold_two', 0.8) self.declare_parameter('consistency_threshold', 0.9) - self.declare_parameter('consistency_threshold_two', 0.95) + self.declare_parameter('consistency_threshold_two', 0.99) + self.declare_parameter('robot_frame_id', 'base_footprint') + self.declare_parameter('robot_parent_frame_id', 'map') self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix # Get parameters @@ -33,6 +37,8 @@ def __init__(self): self.likelihood_threshold_two = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value self.consistency_threshold_two = self.get_parameter('consistency_threshold_two').get_parameter_value().double_value + self.robot_frame_id = self.get_parameter('robot_frame_id').get_parameter_value().string_value + self.robot_parent_frame_id = self.get_parameter('robot_parent_frame_id').get_parameter_value().string_value self.R = np.array(self.get_parameter('R').get_parameter_value().double_array_value) self.adjust_factor = 1 @@ -65,7 +71,7 @@ def __init__(self): 'raw_obstacles', self.obstacle_callback, 10) - self.subscription = self.create_subscription( + self.subscription = self.create_subscription( # if TF is not available PoseWithCovarianceStamped, 'final_pose', self.pred_pose_callback, @@ -79,33 +85,56 @@ def __init__(self): ) self.subscription # prevent unused variable warning + # tf2 buffer + self.tf_buffer = Buffer() + self.tf_listener = TransformListener(self.tf_buffer, self) + self.get_logger().debug('Lidar Localization Node initialized') self.init_landmarks_map() self.robot_pose = [] - self.P_pred = [] + self.robot_pose_topic = [] + self.P_pred = np.array([[0.05**2, 0.0, 0.0], [0.0, 0.05**2, 0.0], [0.0, 0.0, 0.1]]) + self.P_pred_topic = [] self.newPose = False self.lidar_pose_msg = PoseWithCovarianceStamped() - + def obstacle_callback(self, msg): - self.get_logger().debug('obstacle detected') self.obs_time = msg.header.stamp - self.obs_raw = [] + + # get latest robot pose + if self.get_robot_pose() == -1: + self.get_logger().debug("no new robot pose") + return + + # get the obstacles for obs in msg.circles: self.obs_raw.append(np.array([obs.center.x, obs.center.y])) # main: data processing - if self.newPose == False: # Check for new robot pose - self.get_logger().debug("no new robot pose or P_pred") - return - self.landmarks_candidate = self.get_landmarks_candidate() - self.landmarks_set = self.get_landmarks_set() - if len(self.landmarks_set) == 0: + # if each of the three beacon has at least one candidate + landmarks_with_candidates = 0 + for i in range(3): + if len(self.landmarks_candidate[i]['obs_candidates']) > 0: + landmarks_with_candidates += 1 + if landmarks_with_candidates == 3: + self.landmarks_set = self.get_landmarks_set() + elif landmarks_with_candidates == 2: + self.get_two_beacons() # or devide into more steps? + return + else: + self.get_logger().warn("less than two landmarks") + return + + # for the normal case complete three landmarks + if len(self.landmarks_set) == 0: # when fail to test consistency and likelihood check self.get_logger().debug("empty landmarks set") + self.get_two_beacons() return + self.get_lidar_pose() # clear used data @@ -117,13 +146,57 @@ def pred_pose_callback(self, msg): # check orientation range TODO: should be -pi to pi?? if orientation < 0: orientation += 2 * np.pi - self.robot_pose = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation]) - self.P_pred = np.array([ + self.robot_pose_topic = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation]) + self.P_pred_topic = np.array([ [msg.pose.covariance[0]*100, 0, 0], [0, msg.pose.covariance[7]*100, 0], [0, 0, msg.pose.covariance[35]*1e6] ]) + def get_robot_pose(self): + # check if TF is available. If true, use the TF. If false, use the latest topic + try: + self.predict_transform = self.tf_buffer.lookup_transform( + self.robot_parent_frame_id, + self.robot_frame_id, + self.obs_time + ) + self.robot_pose = np.array([ + self.predict_transform.transform.translation.x, + self.predict_transform.transform.translation.y, + yaw_from_quaternion( + self.predict_transform.transform.rotation.x, + self.predict_transform.transform.rotation.y, + self.predict_transform.transform.rotation.z, + self.predict_transform.transform.rotation.w + ) + ]) + self.P_pred = np.array([ + [self.P_pred_topic[0][0], 0, 0], + [0, self.P_pred_topic[1][1], 0], + [0, 0, self.P_pred_topic[2][2]] + ]) + return 0 + except (LookupException, ConnectivityException, ExtrapolationException) as e: + self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}') + self.get_logger().info("now try to use the latest topic") + if self.newPose == False: + self.get_logger().error("no new predict topic, skip.") + return -1 + else: + self.robot_pose = np.array([ + self.robot_pose_topic[0], + self.robot_pose_topic[1], + self.robot_pose_topic[2] + ]) + self.P_pred = np.array([ + [self.P_pred_topic[0][0], 0, 0], + [0, self.P_pred_topic[1][1], 0], + [0, 0, self.P_pred_topic[2][2]] + ]) + self.get_logger().info("use the latest topic") + return 0 + def set_lidar_side_callback(self, msg): side = msg.data.lower() if side in ['0', '1', 'yellow', 'blue']: @@ -184,6 +257,9 @@ def get_obs_candidate(self, landmark): [-(x_o - x_r) / r_prime, -(y_o - y_r) / r_prime, 0], [(y_o - y_r) / r_prime ** 2, -(x_o - x_r) / r_prime ** 2, -1] ]) + if self.P_pred.size == 0 or self.P_pred.shape != (3, 3): + self.get_logger().error("P_pred is not properly initialized") + return [] S = H @ self.P_pred @ H.T + self.R S_inv = np.linalg.inv(S) @@ -232,13 +308,7 @@ def get_landmarks_candidate(self): def get_landmarks_set(self): landmarks_candidate = self.landmarks_candidate landmarks_set = [] - landmarks_with_candidates = 0 - for i in 3: - if len(landmarks_candidate[i]['obs_candidates']) > 0: - landmarks_with_candidates += 1 - if landmarks_with_candidates < 3: - self.get_two_beacons() - return + for i in range(len(landmarks_candidate[0]['obs_candidates'])): for j in range(len(landmarks_candidate[1]['obs_candidates'])): for k in range(len(landmarks_candidate[2]['obs_candidates'])): @@ -270,12 +340,13 @@ def get_landmarks_set(self): def get_lidar_pose(self): landmarks_map = self.landmarks_map landmarks_set = self.landmarks_set - if not landmarks_set: - raise ValueError("landmarks_set is empty") + # if not landmarks_set: + # raise ValueError("landmarks_set is empty") + # prefer the set with more beacons # landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True) # activate this if there's one more beacon - # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort? - max_likelihood = max(set['probability_set'] for set in landmarks_set) + # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort? + max_likelihood = max(set['probability_set'] for set in landmarks_set) # TODO: sort by consistency or likelihood? max_likelihood_idx = next(i for i, set in enumerate(landmarks_set) if set['probability_set'] == max_likelihood) lidar_pose = np.zeros(3) @@ -299,6 +370,8 @@ def get_lidar_pose(self): try: X = np.linalg.solve(A.T @ A, A.T @ b) if X[0] < 0 or X[0] > 3 or X[1] < 0 or X[1] > 2: + # result is not in boundary area, try to use two beacons + self.get_two_beacons() return lidar_pose[0] = X[0] lidar_pose[1] = X[1] @@ -401,10 +474,11 @@ def get_geometry_consistency(self, beacons): def get_two_beacons(self): two_index = [] - for i in 3: + self.get_logger().info("get two beacons") + for i in range(3): if len(self.landmarks_candidate[i]['obs_candidates']) > 0: two_index.append(i) - if len(two_index) < 2: + if len(two_index) < 2: # should not require this self.get_logger().debug("not enough beacons") return # check geometry consistency @@ -418,7 +492,7 @@ def get_two_beacons(self): } } # consistency of the set - set['consistency'] = nominal_distance / np.linalg.norm(set['beacons'][0] - set['beacons'][1]) + set['consistency'] = np.linalg.norm(set['beacons'][0] - set['beacons'][1]) / nominal_distance if set['consistency'] < self.consistency_threshold_two: self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold}: {set['consistency']}") continue @@ -435,23 +509,31 @@ def get_two_beacons(self): print(f"Probability: {set['probability_set']}") print(f"Geometry Consistency: {set['consistency']}") + # check if there is valid set + if len(self.landmarks_set) == 0: + self.get_logger().debug("no valid set") + return # use the set with the highest probability_set max_likelihood = max(set['probability_set'] for set in self.landmarks_set) max_likelihood_idx = next(i for i, set in enumerate(self.landmarks_set) if set['probability_set'] == max_likelihood) - beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][i] for i in range(2)] - # calculate the lidar pose (TODO) + beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][two_index[0]], self.landmarks_set[max_likelihood_idx]['beacons'][two_index[1]]] + # calculate the lidar pose # take the average of the position given by the two beacons - lidar_pose = np.zeros(3) # Ensure lidar_pose has three elements + lidar_pose = np.zeros(3) # Ensure that lidar_pose has three elements pose_1 = self.landmarks_map[two_index[0]] - self.landmarks_set[max_likelihood_idx]['beacons'][0] pose_2 = self.landmarks_map[two_index[1]] - self.landmarks_set[max_likelihood_idx]['beacons'][1] lidar_pose[:2] = (pose_1 + pose_2) / 2 # Assign x and y to the first two elements lidar_pose[2] = angle_limit_checking(np.arctan2(pose_1[1], pose_1[0]) - np.arctan2(beacons[0][1], beacons[0][0])) - lidar_cov = np.diag([0.05**2, 0.05**2, 0.05**2]) # what should the optimal value be? + # check if the lidar pose is in the map + if lidar_pose[0] < 0 or lidar_pose[0] > 3 or lidar_pose[1] < 0 or lidar_pose[1] > 2: + self.get_logger().debug("lidar pose is out of map") + return + lidar_cov = np.diag([0.05**2, 0.05**2, 0.05**2]) lidar_cov[0, 0] /= max_likelihood lidar_cov[1, 1] /= max_likelihood lidar_cov[2, 2] /= max_likelihood # publish the lidar pose - self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() # TODO: compensation + self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() self.lidar_pose_msg.header.frame_id = 'map' #TODO: param self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] @@ -468,7 +550,9 @@ def get_two_beacons(self): 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] ] - # self.get_logger().debug(f"lidar_pose: {lidar_pose}") + # also print the likelihood + self.get_logger().info(f"lidar_pose (two): {lidar_pose}") + self.get_logger().info(f"lidar_pose (two) likelihood: {max_likelihood}") self.lidar_pose_pub.publish(self.lidar_pose_msg) # self.get_logger().debug("Published lidar_pose message") From 7a7a153b421b92a97e10bd5b7a1fe2d419b7ab35 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Fri, 18 Apr 2025 15:32:10 +0800 Subject: [PATCH 07/15] Added: (lidar) visulize and publish functions Modified: (lidar) use topic covariance Fixed: (lidar) initialized landmark_set --- .../lidar_member_function.py | 148 +++++++++--------- 1 file changed, 74 insertions(+), 74 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 7b89a91..76eff57 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -92,6 +92,7 @@ def __init__(self): self.get_logger().debug('Lidar Localization Node initialized') self.init_landmarks_map() + self.landmarks_set = [] self.robot_pose = [] self.robot_pose_topic = [] self.P_pred = np.array([[0.05**2, 0.0, 0.0], [0.0, 0.05**2, 0.0], [0.0, 0.0, 0.1]]) @@ -147,7 +148,7 @@ def pred_pose_callback(self, msg): if orientation < 0: orientation += 2 * np.pi self.robot_pose_topic = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation]) - self.P_pred_topic = np.array([ + self.P_pred = np.array([ [msg.pose.covariance[0]*100, 0, 0], [0, msg.pose.covariance[7]*100, 0], [0, 0, msg.pose.covariance[35]*1e6] @@ -171,11 +172,6 @@ def get_robot_pose(self): self.predict_transform.transform.rotation.w ) ]) - self.P_pred = np.array([ - [self.P_pred_topic[0][0], 0, 0], - [0, self.P_pred_topic[1][1], 0], - [0, 0, self.P_pred_topic[2][2]] - ]) return 0 except (LookupException, ConnectivityException, ExtrapolationException) as e: self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}') @@ -189,11 +185,6 @@ def get_robot_pose(self): self.robot_pose_topic[1], self.robot_pose_topic[2] ]) - self.P_pred = np.array([ - [self.P_pred_topic[0][0], 0, 0], - [0, self.P_pred_topic[1][1], 0], - [0, 0, self.P_pred_topic[2][2]] - ]) self.get_logger().info("use the latest topic") return 0 @@ -390,60 +381,62 @@ def get_lidar_pose(self): lidar_cov[1, 1] /= max_likelihood lidar_cov[2, 2] /= max_likelihood - # publish the lidar pose - self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() # TODO: compensation - self.lidar_pose_msg.header.frame_id = 'map' #TODO: param - self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] - self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] - self.lidar_pose_msg.pose.pose.position.z = 0.0 - self.lidar_pose_msg.pose.pose.orientation.x = 0.0 - self.lidar_pose_msg.pose.pose.orientation.y = 0.0 - self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2) - self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2) - self.lidar_pose_msg.pose.covariance = [ - lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] - ] - # self.get_logger().debug(f"lidar_pose: {lidar_pose}") - self.lidar_pose_pub.publish(self.lidar_pose_msg) - # self.get_logger().debug("Published lidar_pose message") + self.pub_lidar_pose(lidar_pose, lidar_cov) except np.linalg.LinAlgError as e: self.get_logger().warn("Linear algebra error: {}".format(e)) # use markerarray to show the landmarks it used if self.visualize_candidate: - marker_array = MarkerArray() - for i, beacon in enumerate(beacons): - marker = Marker() - marker.header.frame_id = "base_footprint" - marker.header.stamp = self.get_clock().now().to_msg() - marker.ns = "chosen_landmarks" - marker.type = Marker.SPHERE - marker.action = Marker.ADD - marker.scale.x = 0.1 - marker.scale.y = 0.1 - marker.scale.z = 0.01 - marker.pose.position.x = beacon[0] - marker.pose.position.y = beacon[1] - marker.pose.position.z = 0.0 - marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0) - marker.id = i - marker_array.markers.append(marker) - self.circles_pub.publish(marker_array) - self.get_logger().debug("Published marker array") - # clean up - marker_array.markers.clear() + self.visualize_candidates(beacons, max_likelihood, landmarks_set[max_likelihood_idx]['consistency']) else: self.get_logger().debug("not enough beacons") return lidar_pose, lidar_cov + def visualize_candidates(self, beacons, max_likelihood, consistency): + marker_array = MarkerArray() + for i, beacon in enumerate(beacons): + marker = Marker() + marker.header.frame_id = "base_footprint" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "chosen_landmarks" + marker.type = Marker.SPHERE + marker.action = Marker.ADD + marker.scale.x = 0.1 + marker.scale.y = 0.1 + marker.scale.z = 0.01 + marker.pose.position.x = beacon[0] + marker.pose.position.y = beacon[1] + marker.pose.position.z = 0.0 + if len(beacons) == 3: + marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0) + elif len(beacons) == 2: + marker.color = ColorRGBA(r=0.5, g=0.5, b=0.0, a=1.0) + marker.id = i + marker_array.markers.append(marker) + # add the max_likelihood, consistency to the marker array + marker = Marker() + marker.header.frame_id = "map" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "set_param" + marker.type = Marker.TEXT_VIEW_FACING + marker.action = Marker.ADD + marker.scale.z = 0.1 + marker.pose.position.x = 3.0 + marker.pose.position.y = 2.0 + marker.pose.position.z = 0.5 + marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0) + marker.text = f"max_likelihood: {max_likelihood:.2f}, consistency: {consistency:.2f}" + marker.id = 10 + marker_array.markers.append(marker) + # publish the marker array + self.circles_pub.publish(marker_array) + self.get_logger().debug("Published marker array") + # clean up + marker_array.markers.clear() + def get_geometry_consistency(self, beacons): geometry_description = {} consistency = 1.0 @@ -472,6 +465,27 @@ def get_geometry_consistency(self, beacons): return consistency + def pub_lidar_pose(self, lidar_pose, lidar_cov): + # publish the lidar pose + self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() + self.lidar_pose_msg.header.frame_id = 'map' #TODO: param + self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] + self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] + self.lidar_pose_msg.pose.pose.position.z = 0.0 + self.lidar_pose_msg.pose.pose.orientation.x = 0.0 + self.lidar_pose_msg.pose.pose.orientation.y = 0.0 + self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2) + self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2) + self.lidar_pose_msg.pose.covariance = [ + lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] + ] + self.lidar_pose_pub.publish(self.lidar_pose_msg) + def get_two_beacons(self): two_index = [] self.get_logger().info("get two beacons") @@ -492,7 +506,7 @@ def get_two_beacons(self): } } # consistency of the set - set['consistency'] = np.linalg.norm(set['beacons'][0] - set['beacons'][1]) / nominal_distance + set['consistency'] = 1 - abs(np.linalg.norm(set['beacons'][0] - set['beacons'][1]) - nominal_distance) / nominal_distance if set['consistency'] < self.consistency_threshold_two: self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold}: {set['consistency']}") continue @@ -511,7 +525,7 @@ def get_two_beacons(self): # check if there is valid set if len(self.landmarks_set) == 0: - self.get_logger().debug("no valid set") + self.get_logger().warn("no valid set") return # use the set with the highest probability_set max_likelihood = max(set['probability_set'] for set in self.landmarks_set) @@ -533,29 +547,15 @@ def get_two_beacons(self): lidar_cov[1, 1] /= max_likelihood lidar_cov[2, 2] /= max_likelihood # publish the lidar pose - self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() - self.lidar_pose_msg.header.frame_id = 'map' #TODO: param - self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] - self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] - self.lidar_pose_msg.pose.pose.position.z = 0.0 - self.lidar_pose_msg.pose.pose.orientation.x = 0.0 - self.lidar_pose_msg.pose.pose.orientation.y = 0.0 - self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2) - self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2) - self.lidar_pose_msg.pose.covariance = [ - lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] - ] + self.pub_lidar_pose(lidar_pose, lidar_cov) + if self.visualize_candidate: + self.visualize_candidates(beacons, max_likelihood, self.landmarks_set[max_likelihood_idx]['consistency']) + # also print the likelihood self.get_logger().info(f"lidar_pose (two): {lidar_pose}") self.get_logger().info(f"lidar_pose (two) likelihood: {max_likelihood}") - self.lidar_pose_pub.publish(self.lidar_pose_msg) - # self.get_logger().debug("Published lidar_pose message") + self.clear_data() def quaternion_from_euler(ai, aj, ak): ai /= 2.0 From 8cb1f0ce03a977a7bf61550d3b5b67d8a7462609 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Fri, 18 Apr 2025 16:35:18 +0800 Subject: [PATCH 08/15] Add: (lidar) visualize candidates --- .../lidar_member_function.py | 101 ++++++++++++++++-- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 76eff57..33d2be9 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -22,9 +22,9 @@ def __init__(self): # Declare algorithm parameters self.declare_parameter('likelihood_threshold', 0.001) - self.declare_parameter('likelihood_threshold_two', 0.8) + self.declare_parameter('likelihood_threshold_two', 0.5) self.declare_parameter('consistency_threshold', 0.9) - self.declare_parameter('consistency_threshold_two', 0.99) + self.declare_parameter('consistency_threshold_two', 0.97) self.declare_parameter('robot_frame_id', 'base_footprint') self.declare_parameter('robot_parent_frame_id', 'map') self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix @@ -32,7 +32,7 @@ def __init__(self): # Get parameters self.side = self.get_parameter('side').get_parameter_value().integer_value self.debug_mode = self.get_parameter('debug_mode').get_parameter_value().bool_value - self.visualize_candidate = self.get_parameter('visualize_candidate').get_parameter_value().bool_value + self.visualize_true = self.get_parameter('visualize_candidate').get_parameter_value().bool_value self.likelihood_threshold = self.get_parameter('likelihood_threshold').get_parameter_value().double_value self.likelihood_threshold_two = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value @@ -63,7 +63,10 @@ def __init__(self): # ros settings self.lidar_pose_pub = self.create_publisher(PoseWithCovarianceStamped, 'lidar_pose', 10) - if self.visualize_candidate: + if self.visualize_true: + self.marker_array = MarkerArray() + self.marker_num_pre = np.array([0, 0, 0]) + self.marker_id = 0 self.circles_pub = self.create_publisher(MarkerArray, 'candidates', 10) self.subscription = self.create_subscription( @@ -274,6 +277,8 @@ def get_obs_candidate(self, landmark): likelihood = np.exp(-0.5 * di_square) # self.get_logger().info(f"adjLikelihood: {likelihood}, obs:{obs}") obs_candidates.append({'position': obs, 'probability': likelihood}) + if self.visualize_true: + self.visualize_candidates(obs, likelihood) return obs_candidates @@ -283,11 +288,18 @@ def get_landmarks_candidate(self): self.beacon_no = 0 for landmark in landmarks_map: self.beacon_no += 1 + self.marker_id = 0 candidate = { 'landmark': landmark, 'obs_candidates': self.get_obs_candidate(landmark) } landmarks_candidate.append(candidate) + + if self.visualize_true: + self.circles_pub.publish(self.marker_array) + # self.get_logger().debug("Published marker array") + self.marker_array.markers.clear() # clean up (is this enough?) + # print landmarks_candidate for debug if self.debug_mode: for i, landmark in enumerate(landmarks_candidate): @@ -387,15 +399,15 @@ def get_lidar_pose(self): self.get_logger().warn("Linear algebra error: {}".format(e)) # use markerarray to show the landmarks it used - if self.visualize_candidate: - self.visualize_candidates(beacons, max_likelihood, landmarks_set[max_likelihood_idx]['consistency']) + if self.visualize_true: + self.visualize_sets(beacons, max_likelihood, landmarks_set[max_likelihood_idx]['consistency']) else: self.get_logger().debug("not enough beacons") return lidar_pose, lidar_cov - def visualize_candidates(self, beacons, max_likelihood, consistency): + def visualize_sets(self, beacons, max_likelihood, consistency): marker_array = MarkerArray() for i, beacon in enumerate(beacons): marker = Marker() @@ -424,8 +436,8 @@ def visualize_candidates(self, beacons, max_likelihood, consistency): marker.type = Marker.TEXT_VIEW_FACING marker.action = Marker.ADD marker.scale.z = 0.1 - marker.pose.position.x = 3.0 - marker.pose.position.y = 2.0 + marker.pose.position.x = 3.5 + marker.pose.position.y = 2.5 marker.pose.position.z = 0.5 marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0) marker.text = f"max_likelihood: {max_likelihood:.2f}, consistency: {consistency:.2f}" @@ -437,6 +449,73 @@ def visualize_candidates(self, beacons, max_likelihood, consistency): # clean up marker_array.markers.clear() + def visualize_candidates(self, obs, likelihood): + + self.marker_id += 1 + + # circles + marker = Marker() + marker.header.frame_id = "base_footprint" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = f"candidates_circle{self.beacon_no}" + marker.type = Marker.SPHERE + marker.action = Marker.ADD + marker.scale.x = 0.1 + marker.scale.y = 0.1 + marker.scale.z = 0.01 + + marker.pose.position.x = obs[0] + marker.pose.position.y = obs[1] + marker.pose.position.z = 0.0 + if self.beacon_no == 1: + marker.color = ColorRGBA(r=1.0, g=0.0, b=0.0, a=likelihood) # Red for beacon 1 + elif self.beacon_no == 2: + marker.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=likelihood) # Green for beacon 2 + elif self.beacon_no == 3: + marker.color = ColorRGBA(r=0.0, g=0.0, b=1.0, a=likelihood) # Blue for beacon 3 + marker.id = self.marker_id + self.marker_array.markers.append(marker) + + # texts + text_marker = Marker() + text_marker.header.frame_id = "base_footprint" + text_marker.header.stamp = self.get_clock().now().to_msg() + text_marker.ns = f"candidates_text{self.beacon_no}" + text_marker.type = Marker.TEXT_VIEW_FACING + text_marker.action = Marker.ADD + text_marker.scale.z = 0.1 + text_marker.color = ColorRGBA(r=0.0, g=0.0, b=0.0, a=0.2) # White text + + text_marker.pose.position.x = obs[0] + 0.2 + text_marker.pose.position.y = obs[1] + text_marker.pose.position.z = 0.1 + text_marker.text = f"{likelihood:.2f}" + text_marker.id = self.marker_id + self.marker_array.markers.append(text_marker) + + # remove the old markers + num_old_markers = self.marker_num_pre[self.beacon_no - 1] - self.marker_id + for i in range(num_old_markers): + old_marker = Marker() + old_marker.header.frame_id = "base_footprint" + old_marker.header.stamp = self.get_clock().now().to_msg() + old_marker.ns = f"candidates_circle{self.beacon_no}" + old_marker.type = Marker.SPHERE + old_marker.action = Marker.DELETE + old_marker.id = i + 1 + self.marker_array.markers.append(old_marker) + + old_text_marker = Marker() + old_text_marker.header.frame_id = "base_footprint" + old_text_marker.header.stamp = self.get_clock().now().to_msg() + old_text_marker.ns = f"candidates_text{self.beacon_no}" + old_text_marker.type = Marker.TEXT_VIEW_FACING + old_text_marker.action = Marker.DELETE + old_text_marker.id = i + 1 + self.marker_array.markers.append(old_text_marker) + # update the marker number + self.marker_num_pre[self.beacon_no - 1] = self.marker_id + def get_geometry_consistency(self, beacons): geometry_description = {} consistency = 1.0 @@ -548,8 +627,8 @@ def get_two_beacons(self): lidar_cov[2, 2] /= max_likelihood # publish the lidar pose self.pub_lidar_pose(lidar_pose, lidar_cov) - if self.visualize_candidate: - self.visualize_candidates(beacons, max_likelihood, self.landmarks_set[max_likelihood_idx]['consistency']) + if self.visualize_true: + self.visualize_sets(beacons, max_likelihood, self.landmarks_set[max_likelihood_idx]['consistency']) # also print the likelihood self.get_logger().info(f"lidar_pose (two): {lidar_pose}") From 06e7d2a1b0b78e6018408494a2cb8d26b1675337 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Mon, 21 Apr 2025 21:01:15 +0800 Subject: [PATCH 09/15] Modified: (lidar_loc) visualization set (z=-1) --- .../lidar_localization_pkg/lidar_member_function.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 33d2be9..d70fb8d 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -416,12 +416,12 @@ def visualize_sets(self, beacons, max_likelihood, consistency): marker.ns = "chosen_landmarks" marker.type = Marker.SPHERE marker.action = Marker.ADD - marker.scale.x = 0.1 - marker.scale.y = 0.1 + marker.scale.x = 0.15 + marker.scale.y = 0.15 marker.scale.z = 0.01 marker.pose.position.x = beacon[0] marker.pose.position.y = beacon[1] - marker.pose.position.z = 0.0 + marker.pose.position.z = -0.1 if len(beacons) == 3: marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0) elif len(beacons) == 2: From 1aea2c353454da2b3f72679c33f35a2b4a49e988 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Mon, 21 Apr 2025 21:02:24 +0800 Subject: [PATCH 10/15] Modified: (lidar_loc) P_pred z --- .../lidar_localization_pkg/lidar_member_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index d70fb8d..33ba16c 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -154,7 +154,7 @@ def pred_pose_callback(self, msg): self.P_pred = np.array([ [msg.pose.covariance[0]*100, 0, 0], [0, msg.pose.covariance[7]*100, 0], - [0, 0, msg.pose.covariance[35]*1e6] + [0, 0, msg.pose.covariance[35]*1e3] # TODO: experiement with the value, do I give too much flexibility to the yaw? What value is better? ]) def get_robot_pose(self): From 4270fc3c799a436777ee72987a849e5ec8bed426 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Mon, 21 Apr 2025 21:04:49 +0800 Subject: [PATCH 11/15] Added: (lidar_loc) angle consistency --- .../lidar_member_function.py | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 33ba16c..5ceacb4 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -566,16 +566,32 @@ def pub_lidar_pose(self, lidar_pose, lidar_cov): self.lidar_pose_pub.publish(self.lidar_pose_msg) def get_two_beacons(self): + two_index = [] self.get_logger().info("get two beacons") + + # get the index of the two beacons with candidates for i in range(3): if len(self.landmarks_candidate[i]['obs_candidates']) > 0: two_index.append(i) - if len(two_index) < 2: # should not require this - self.get_logger().debug("not enough beacons") - return - # check geometry consistency + + self.get_logger().info(f"two_index: {two_index}") + nominal_distance = np.linalg.norm(self.landmarks_map[two_index[0]] - self.landmarks_map[two_index[1]]) + self.get_logger().info(f"nominal distance: {nominal_distance:.2f}") + + # nominal angle + angle1 = np.arctan2(self.landmarks_map[two_index[1]][1] - self.robot_pose[1], self.landmarks_map[two_index[1]][0] - self.robot_pose[0]) + angle0 = np.arctan2(self.landmarks_map[two_index[0]][1] - self.robot_pose[1], self.landmarks_map[two_index[0]][0] - self.robot_pose[0]) + # print angle1, angle0 and nominal_angle in logger + self.get_logger().info(f"angle1: {angle1 * 180 / np.pi:.2f} degree") + self.get_logger().info(f"angle0: {angle0 * 180 / np.pi:.2f} degree") + + nominal_angle = angle_limit_checking(angle1 - angle0) + # print nominal angle, display in degree + self.get_logger().info(f"nominal angle: {nominal_angle * 180 / np.pi:.2f} degree") + + # get the sets for i in range(len(self.landmarks_candidate[two_index[0]]['obs_candidates'])): for j in range(len(self.landmarks_candidate[two_index[1]]['obs_candidates'])): set = { @@ -584,10 +600,29 @@ def get_two_beacons(self): 1: self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['position'] } } - # consistency of the set + # geometry consistency + # 1. check the cross product: 'robot to beacon 0' cross 'robot to beacon 1' + cross = np.cross(set['beacons'][0], set['beacons'][1]) # TODO: check if beacon 0 and 1 will be in the order of beacon a, b and c + if self.side == 0: # yellow is clockwise(negative) + if cross > 0: + self.get_logger().error("cross product is positive") + continue + elif self.side == 1: # blue is counter-clockwise(positive) + if cross < 0: + self.get_logger().error("cross product is negative") + continue + # 2. check the distance between the two beacons set['consistency'] = 1 - abs(np.linalg.norm(set['beacons'][0] - set['beacons'][1]) - nominal_distance) / nominal_distance if set['consistency'] < self.consistency_threshold_two: - self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold}: {set['consistency']}") + self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold_two}: {set['consistency']}") + continue + # 3. check the angle between the two beacons + angle1 = np.arctan2(set['beacons'][1][1], set['beacons'][1][0]) + angle0 = np.arctan2(set['beacons'][0][1], set['beacons'][0][0]) + angle = angle_limit_checking(angle1 - angle0) + # angle = np.arctan2(set['beacons'][1][1] - set['beacons'][0][1], set['beacons'][1][0] - set['beacons'][0][0]) + if abs(angle_limit_checking(angle - nominal_angle)) > np.pi / 18: # 10 degree + self.get_logger().error(f"Angle difference is too large: {angle * 180 / np.pi:.2f} degree") continue # probability of the set set['probability_set'] = self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['probability'] * self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['probability'] From 19219a14a45409c4fc24eaaf89e61604a0baa471 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Wed, 23 Apr 2025 20:02:33 +0800 Subject: [PATCH 12/15] Add: (lidar_loc) lidar_pose with 2 beacons --- .../lidar_member_function.py | 92 +++++++++++-------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 5ceacb4..6df3360 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -244,7 +244,7 @@ def get_obs_candidate(self, landmark): print(f"Robot pose from TF: {self.robot_pose}") x_o, y_o = landmark r_prime = np.sqrt((x_o - x_r) ** 2 + (y_o - y_r) ** 2) - temp = angle_limit_checking(np.arctan2(y_o - y_r, x_o - x_r)) # limit checking is not necessary right? + temp = np.arctan2(y_o - y_r, x_o - x_r) # limit checking is not necessary right? theta_prime = angle_limit_checking(temp - phi_r) H = np.array([ @@ -261,21 +261,16 @@ def get_obs_candidate(self, landmark): for obs in self.obs_raw: r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) theta_z = np.arctan2(obs[1], obs[0]) + if r_z < 1.3: + obs[0] = obs[0] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.cos(theta_z) + obs[1] = obs[1] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.sin(theta_z) + r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) + theta_z = np.arctan2(obs[1], obs[0]) y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)]) di_square = y.T @ S_inv @ y likelihood = np.exp(-0.5 * di_square) if likelihood > self.likelihood_threshold: - # self.get_logger().info(f"Likelihood: {likelihood}, obs:{obs}") - if r_z < 1.3: - obs[0] = obs[0] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.cos(theta_z) - obs[1] = obs[1] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.sin(theta_z) - r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) - theta_z = np.arctan2(obs[1], obs[0]) - y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)]) - di_square = y.T @ S_inv @ y - likelihood = np.exp(-0.5 * di_square) - # self.get_logger().info(f"adjLikelihood: {likelihood}, obs:{obs}") obs_candidates.append({'position': obs, 'probability': likelihood}) if self.visualize_true: self.visualize_candidates(obs, likelihood) @@ -570,23 +565,21 @@ def get_two_beacons(self): two_index = [] self.get_logger().info("get two beacons") - # get the index of the two beacons with candidates + # get the index of the two beacons with candidates and obstacle probability larger than 0.8 for i in range(3): - if len(self.landmarks_candidate[i]['obs_candidates']) > 0: + if any(candidate['probability'] > self.likelihood_threshold_two for candidate in self.landmarks_candidate[i]['obs_candidates']): two_index.append(i) + + if len(two_index) < 2: + return self.get_logger().info(f"two_index: {two_index}") + # nominal geometry nominal_distance = np.linalg.norm(self.landmarks_map[two_index[0]] - self.landmarks_map[two_index[1]]) self.get_logger().info(f"nominal distance: {nominal_distance:.2f}") - - # nominal angle angle1 = np.arctan2(self.landmarks_map[two_index[1]][1] - self.robot_pose[1], self.landmarks_map[two_index[1]][0] - self.robot_pose[0]) angle0 = np.arctan2(self.landmarks_map[two_index[0]][1] - self.robot_pose[1], self.landmarks_map[two_index[0]][0] - self.robot_pose[0]) - # print angle1, angle0 and nominal_angle in logger - self.get_logger().info(f"angle1: {angle1 * 180 / np.pi:.2f} degree") - self.get_logger().info(f"angle0: {angle0 * 180 / np.pi:.2f} degree") - nominal_angle = angle_limit_checking(angle1 - angle0) # print nominal angle, display in degree self.get_logger().info(f"nominal angle: {nominal_angle * 180 / np.pi:.2f} degree") @@ -605,11 +598,11 @@ def get_two_beacons(self): cross = np.cross(set['beacons'][0], set['beacons'][1]) # TODO: check if beacon 0 and 1 will be in the order of beacon a, b and c if self.side == 0: # yellow is clockwise(negative) if cross > 0: - self.get_logger().error("cross product is positive") + # self.get_logger().debug("cross product is positive") continue elif self.side == 1: # blue is counter-clockwise(positive) if cross < 0: - self.get_logger().error("cross product is negative") + # self.get_logger().debug("cross product is negative") continue # 2. check the distance between the two beacons set['consistency'] = 1 - abs(np.linalg.norm(set['beacons'][0] - set['beacons'][1]) - nominal_distance) / nominal_distance @@ -620,38 +613,35 @@ def get_two_beacons(self): angle1 = np.arctan2(set['beacons'][1][1], set['beacons'][1][0]) angle0 = np.arctan2(set['beacons'][0][1], set['beacons'][0][0]) angle = angle_limit_checking(angle1 - angle0) - # angle = np.arctan2(set['beacons'][1][1] - set['beacons'][0][1], set['beacons'][1][0] - set['beacons'][0][0]) if abs(angle_limit_checking(angle - nominal_angle)) > np.pi / 18: # 10 degree self.get_logger().error(f"Angle difference is too large: {angle * 180 / np.pi:.2f} degree") continue + # self.get_logger().info(f"angle1: {angle1 * 180 / np.pi:.2f} degree") + # self.get_logger().info(f"angle0: {angle0 * 180 / np.pi:.2f} degree") + # self.get_logger().info(f"cadidate angle: {angle * 180 / np.pi:.2f} degree") # probability of the set set['probability_set'] = self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['probability'] * self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['probability'] if set['probability_set'] < self.likelihood_threshold_two: - self.get_logger().debug(f"Probability is less than {self.likelihood_threshold}: {set['probability_set']}") + self.get_logger().debug(f"Probability is less than {self.likelihood_threshold_two}: {set['probability_set']}") continue self.landmarks_set.append(set) - # print landmarks_set for debug - if self.debug_mode: - for i, set in enumerate(self.landmarks_set): - print(f"Set {i + 1}:") - print(f"Probability: {set['probability_set']}") - print(f"Geometry Consistency: {set['consistency']}") # check if there is valid set if len(self.landmarks_set) == 0: self.get_logger().warn("no valid set") return - # use the set with the highest probability_set + + # use the set with the highest probability max_likelihood = max(set['probability_set'] for set in self.landmarks_set) max_likelihood_idx = next(i for i, set in enumerate(self.landmarks_set) if set['probability_set'] == max_likelihood) - beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][two_index[0]], self.landmarks_set[max_likelihood_idx]['beacons'][two_index[1]]] + beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][0], self.landmarks_set[max_likelihood_idx]['beacons'][1]] # calculate the lidar pose - # take the average of the position given by the two beacons - lidar_pose = np.zeros(3) # Ensure that lidar_pose has three elements - pose_1 = self.landmarks_map[two_index[0]] - self.landmarks_set[max_likelihood_idx]['beacons'][0] - pose_2 = self.landmarks_map[two_index[1]] - self.landmarks_set[max_likelihood_idx]['beacons'][1] - lidar_pose[:2] = (pose_1 + pose_2) / 2 # Assign x and y to the first two elements - lidar_pose[2] = angle_limit_checking(np.arctan2(pose_1[1], pose_1[0]) - np.arctan2(beacons[0][1], beacons[0][0])) + lidar_pose = self.get_lidar_pose_two( + two_index[0], + two_index[1], + beacons[0][0], beacons[0][1], + beacons[1][0], beacons[1][1] + ) # check if the lidar pose is in the map if lidar_pose[0] < 0 or lidar_pose[0] > 3 or lidar_pose[1] < 0 or lidar_pose[1] > 2: self.get_logger().debug("lidar pose is out of map") @@ -671,6 +661,34 @@ def get_two_beacons(self): self.clear_data() + def get_lidar_pose_two(self, landmark_index_1, landmark_index_2, bxr, byr, cxr, cyr): + + lidar_pose = np.zeros(3) + # the beacons position w.r.t. map + bxm = self.landmarks_map[landmark_index_1][0] + bym = self.landmarks_map[landmark_index_1][1] + cxm = self.landmarks_map[landmark_index_2][0] + cym = self.landmarks_map[landmark_index_2][1] + x = cxm - bxm + y = cym - bym + xr = cxr - bxr + yr = cyr - byr + # calculate the angle from robot to map + costheta = (xr*x + yr*y) / (xr**2 + yr**2) + sintheta = (xr*y - yr*x) / (xr**2 + yr**2) + lidar_pose[2] = np.arctan2(sintheta, costheta) + # find the linear translation from robot to map + # 1. transform beacon positions from w.r.t. robot to w.r.t. map + bx = costheta * bxr - sintheta * byr + by = sintheta * bxr + costheta * byr + cx = costheta * cxr - sintheta * cyr + cy = sintheta * cxr + costheta * cyr + # 2. linear translation from robot to map, take the average of the result from the two beacons + lidar_pose[0] = (bxm - bx)/2 + (cxm - cx)/2 + lidar_pose[1] = (bym - by)/2 + (cym - cy)/2 + + return lidar_pose + def quaternion_from_euler(ai, aj, ak): ai /= 2.0 aj /= 2.0 From aed2c4d821430aea00f8dd111b67127ad337598f Mon Sep 17 00:00:00 2001 From: jossiewang Date: Wed, 23 Apr 2025 20:11:28 +0800 Subject: [PATCH 13/15] Add: compensation using TF --- .../lidar_member_function.py | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 6df3360..8c20c4e 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -539,7 +539,8 @@ def get_geometry_consistency(self, beacons): return consistency - def pub_lidar_pose(self, lidar_pose, lidar_cov): + def pub_lidar_pose(self, lidar_pose_, lidar_cov): + lidar_pose = self.pose_compensation(lidar_pose_) # publish the lidar pose self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() self.lidar_pose_msg.header.frame_id = 'map' #TODO: param @@ -646,7 +647,7 @@ def get_two_beacons(self): if lidar_pose[0] < 0 or lidar_pose[0] > 3 or lidar_pose[1] < 0 or lidar_pose[1] > 2: self.get_logger().debug("lidar pose is out of map") return - lidar_cov = np.diag([0.05**2, 0.05**2, 0.05**2]) + lidar_cov = np.diag([0.1**2, 0.1**2, 0.1**2]) lidar_cov[0, 0] /= max_likelihood lidar_cov[1, 1] /= max_likelihood lidar_cov[2, 2] /= max_likelihood @@ -688,6 +689,43 @@ def get_lidar_pose_two(self, landmark_index_1, landmark_index_2, bxr, byr, cxr, lidar_pose[1] = (bym - by)/2 + (cym - cy)/2 return lidar_pose + + def pose_compensation(self, lidar_pose): + # find the translation and rotation from obs_time to now using TF + try: + now_transform = self.tf_buffer.lookup_transform( # get the latest transform + self.robot_parent_frame_id, + self.robot_frame_id, + rclpy.time.Time(), + timeout=rclpy.duration.Duration(seconds=0.1) + ) + relative_transform = now_transform + relative_transform.transform.translation.x -= self.predict_transform.transform.translation.x + relative_transform.transform.translation.y -= self.predict_transform.transform.translation.y + # find the relative rotation + q1_inv = [ + self.predict_transform.transform.rotation.x, + self.predict_transform.transform.rotation.y, + self.predict_transform.transform.rotation.z, + -self.predict_transform.transform.rotation.w # Negate for inverse + ] + + q0 = now_transform.transform.rotation + qr = [ + q0.x * q1_inv[3] + q0.w * q1_inv[0] + q0.y * q1_inv[2] - q0.z * q1_inv[1], + q0.y * q1_inv[3] + q0.w * q1_inv[1] + q0.z * q1_inv[0] - q0.x * q1_inv[2], + q0.z * q1_inv[3] + q0.w * q1_inv[2] + q0.x * q1_inv[1] - q0.y * q1_inv[0], + q0.w * q1_inv[3] - q0.x * q1_inv[0] - q0.y * q1_inv[1] - q0.z * q1_inv[2] + ] + + lidar_pose[0] += relative_transform.transform.translation.x + lidar_pose[1] += relative_transform.transform.translation.y + lidar_pose[2] += yaw_from_quaternion(qr[0], qr[1], qr[2], qr[3]) + return lidar_pose + except (LookupException, ConnectivityException, ExtrapolationException) as e: + self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}') + self.get_logger().error("Compensation: Could not transform the robot pose") + return lidar_pose def quaternion_from_euler(ai, aj, ak): ai /= 2.0 From 70769e315d0489fec0a9ea851119780525e41128 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Wed, 23 Apr 2025 20:17:30 +0800 Subject: [PATCH 14/15] Add:(launch) param for lidar_pose of two beacons --- localization-devel-ws/ekf/launch/blue.launch | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/localization-devel-ws/ekf/launch/blue.launch b/localization-devel-ws/ekf/launch/blue.launch index a7545c2..5989f99 100644 --- a/localization-devel-ws/ekf/launch/blue.launch +++ b/localization-devel-ws/ekf/launch/blue.launch @@ -35,7 +35,9 @@ - + + + From 161e9defceb83e0ca82d4bf0dde161ff28ec0330 Mon Sep 17 00:00:00 2001 From: jossiewang Date: Wed, 23 Apr 2025 20:26:25 +0800 Subject: [PATCH 15/15] Modified:(lidar_loc) remove old markers --- .../lidar_localization_pkg/lidar_member_function.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 8c20c4e..7698622 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -291,6 +291,7 @@ def get_landmarks_candidate(self): landmarks_candidate.append(candidate) if self.visualize_true: + self.remove_old_markers() self.circles_pub.publish(self.marker_array) # self.get_logger().debug("Published marker array") self.marker_array.markers.clear() # clean up (is this enough?) @@ -423,6 +424,17 @@ def visualize_sets(self, beacons, max_likelihood, consistency): marker.color = ColorRGBA(r=0.5, g=0.5, b=0.0, a=1.0) marker.id = i marker_array.markers.append(marker) + + if len(beacons) == 2: + # delete marker id 3 + marker_delete = Marker() + marker_delete.header.frame_id = "base_footprint" + marker_delete.ns = "chosen_landmarks" + marker_delete.id = 3 + marker_delete.type = Marker.SPHERE + marker_delete.action = Marker.DELETE + marker_array.markers.append(marker_delete) + # add the max_likelihood, consistency to the marker array marker = Marker() marker.header.frame_id = "map" @@ -488,6 +500,7 @@ def visualize_candidates(self, obs, likelihood): text_marker.id = self.marker_id self.marker_array.markers.append(text_marker) + def remove_old_markers(self): # remove the old markers num_old_markers = self.marker_num_pre[self.beacon_no - 1] - self.marker_id for i in range(num_old_markers):