diff --git a/localization-devel-ws/ekf/launch/blue.launch b/localization-devel-ws/ekf/launch/blue.launch index a7545c2..5989f99 100644 --- a/localization-devel-ws/ekf/launch/blue.launch +++ b/localization-devel-ws/ekf/launch/blue.launch @@ -35,7 +35,9 @@ - + + + diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py index 49a67e4..7698622 100644 --- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py +++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py @@ -1,6 +1,8 @@ import rclpy from rclpy.node import Node +from tf2_ros import Buffer, TransformListener, LookupException, ConnectivityException, ExtrapolationException + from geometry_msgs.msg import PoseWithCovarianceStamped from obstacle_detector.msg import Obstacles from visualization_msgs.msg import Marker, MarkerArray @@ -13,19 +15,31 @@ class LidarLocalization(Node): # inherit from Node def __init__(self): super().__init__('lidar_localization_node') - # Declare parameters + # Declare mode parameters self.declare_parameter('side', 0) self.declare_parameter('debug_mode', False) self.declare_parameter('visualize_candidate', True) + + # Declare algorithm parameters self.declare_parameter('likelihood_threshold', 0.001) + self.declare_parameter('likelihood_threshold_two', 0.5) self.declare_parameter('consistency_threshold', 0.9) + self.declare_parameter('consistency_threshold_two', 0.97) + self.declare_parameter('robot_frame_id', 'base_footprint') + self.declare_parameter('robot_parent_frame_id', 'map') + self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix # Get parameters self.side = self.get_parameter('side').get_parameter_value().integer_value self.debug_mode = self.get_parameter('debug_mode').get_parameter_value().bool_value - self.visualize_candidate = self.get_parameter('visualize_candidate').get_parameter_value().bool_value + self.visualize_true = self.get_parameter('visualize_candidate').get_parameter_value().bool_value self.likelihood_threshold = self.get_parameter('likelihood_threshold').get_parameter_value().double_value + self.likelihood_threshold_two = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value + self.consistency_threshold_two = self.get_parameter('consistency_threshold_two').get_parameter_value().double_value + self.robot_frame_id = self.get_parameter('robot_frame_id').get_parameter_value().string_value + self.robot_parent_frame_id = self.get_parameter('robot_parent_frame_id').get_parameter_value().string_value + self.R = np.array(self.get_parameter('R').get_parameter_value().double_array_value) self.adjust_factor = 1 @@ -48,20 +62,24 @@ def __init__(self): # ros settings self.lidar_pose_pub = self.create_publisher(PoseWithCovarianceStamped, 'lidar_pose', 10) - if self.visualize_candidate: + + if self.visualize_true: + self.marker_array = MarkerArray() + self.marker_num_pre = np.array([0, 0, 0]) + self.marker_id = 0 self.circles_pub = self.create_publisher(MarkerArray, 'candidates', 10) + self.subscription = self.create_subscription( Obstacles, 'raw_obstacles', self.obstacle_callback, 10) - self.subscription = self.create_subscription( + self.subscription = self.create_subscription( # if TF is not available PoseWithCovarianceStamped, 'final_pose', self.pred_pose_callback, 10 ) - # subscribe to set_lidar_side topic self.subscription = self.create_subscription( String, 'set_lidar_side', @@ -70,50 +88,109 @@ def __init__(self): ) self.subscription # prevent unused variable warning - # ros debug logger - self.get_logger().debug('Lidar Localization Node has been initialized') + # tf2 buffer + self.tf_buffer = Buffer() + self.tf_listener = TransformListener(self.tf_buffer, self) + + self.get_logger().debug('Lidar Localization Node initialized') - self.init_landmarks_map(self.landmarks_map) + self.init_landmarks_map() + self.landmarks_set = [] self.robot_pose = [] - self.P_pred = [] + self.robot_pose_topic = [] + self.P_pred = np.array([[0.05**2, 0.0, 0.0], [0.0, 0.05**2, 0.0], [0.0, 0.0, 0.1]]) + self.P_pred_topic = [] self.newPose = False - self.R = np.array([[0.05**2, 0.0], [0.0, 0.05**2]]) # R: measurement noise; TODO: tune the value self.lidar_pose_msg = PoseWithCovarianceStamped() - + def obstacle_callback(self, msg): - self.get_logger().debug('obstacle detected') - # obstacle operation + self.obs_time = msg.header.stamp self.obs_raw = [] + + # get latest robot pose + if self.get_robot_pose() == -1: + self.get_logger().debug("no new robot pose") + return + + # get the obstacles for obs in msg.circles: self.obs_raw.append(np.array([obs.center.x, obs.center.y])) - self.obs_time = msg.header.stamp - # data processing - if self.newPose == False: # Check if robot_pose or P_pred is empty - self.get_logger().debug("no new robot pose or P_pred") + + # main: data processing + self.landmarks_candidate = self.get_landmarks_candidate() + + # if each of the three beacon has at least one candidate + landmarks_with_candidates = 0 + for i in range(3): + if len(self.landmarks_candidate[i]['obs_candidates']) > 0: + landmarks_with_candidates += 1 + if landmarks_with_candidates == 3: + self.landmarks_set = self.get_landmarks_set() + elif landmarks_with_candidates == 2: + self.get_two_beacons() # or devide into more steps? return - self.landmarks_candidate = self.get_landmarks_candidate(self.landmarks_map, self.obs_raw) - self.landmarks_set = self.get_landmarks_set(self.landmarks_candidate) - if len(self.landmarks_set) == 0: + else: + self.get_logger().warn("less than two landmarks") + return + + # for the normal case complete three landmarks + if len(self.landmarks_set) == 0: # when fail to test consistency and likelihood check self.get_logger().debug("empty landmarks set") + self.get_two_beacons() return - self.get_lidar_pose(self.landmarks_set, self.landmarks_map) + + self.get_lidar_pose() + # clear used data self.clear_data() def pred_pose_callback(self, msg): - # self.get_logger().debug("Robot pose callback triggered") self.newPose = True - orientation = euler_from_quaternion(msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) # raw, pitch, *yaw - # check orientation range + orientation = yaw_from_quaternion(msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) # raw, pitch, *yaw + # check orientation range TODO: should be -pi to pi?? if orientation < 0: orientation += 2 * np.pi - self.robot_pose = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation]) + self.robot_pose_topic = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation]) self.P_pred = np.array([ [msg.pose.covariance[0]*100, 0, 0], [0, msg.pose.covariance[7]*100, 0], - [0, 0, msg.pose.covariance[35]*1e6] + [0, 0, msg.pose.covariance[35]*1e3] # TODO: experiement with the value, do I give too much flexibility to the yaw? What value is better? ]) + def get_robot_pose(self): + # check if TF is available. If true, use the TF. If false, use the latest topic + try: + self.predict_transform = self.tf_buffer.lookup_transform( + self.robot_parent_frame_id, + self.robot_frame_id, + self.obs_time + ) + self.robot_pose = np.array([ + self.predict_transform.transform.translation.x, + self.predict_transform.transform.translation.y, + yaw_from_quaternion( + self.predict_transform.transform.rotation.x, + self.predict_transform.transform.rotation.y, + self.predict_transform.transform.rotation.z, + self.predict_transform.transform.rotation.w + ) + ]) + return 0 + except (LookupException, ConnectivityException, ExtrapolationException) as e: + self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}') + self.get_logger().info("now try to use the latest topic") + if self.newPose == False: + self.get_logger().error("no new predict topic, skip.") + return -1 + else: + self.robot_pose = np.array([ + self.robot_pose_topic[0], + self.robot_pose_topic[1], + self.robot_pose_topic[2] + ]) + self.get_logger().info("use the latest topic") + return 0 + def set_lidar_side_callback(self, msg): side = msg.data.lower() if side in ['0', '1', 'yellow', 'blue']: @@ -131,22 +208,26 @@ def set_lidar_side_callback(self, msg): np.array([3.094, 1.948]), np.array([-0.094, 1.0]) ] - self.init_landmarks_map(self.landmarks_map) + self.init_landmarks_map() self.get_logger().debug(f"Set lidar side to {self.side}") else: self.get_logger().warn("Invalid side value") - def init_landmarks_map(self, landmarks_map): - self.landmarks_map = landmarks_map + def init_landmarks_map(self): # calculate the geometry description of landmarks self.geometry_description_map = {} - NUM_LANDMARKS = len(landmarks_map) + NUM_LANDMARKS = len(self.landmarks_map) for i in range(NUM_LANDMARKS): for j in range(i + 1, NUM_LANDMARKS): if i == j: continue - d_ij = np.linalg.norm(landmarks_map[i] - landmarks_map[j]) + d_ij = np.linalg.norm(self.landmarks_map[i] - self.landmarks_map[j]) self.geometry_description_map[(i, j)] = d_ij + # calculate the landmark sequence, mark clockwise or counter-clockwise (clockwise for yellow, counter-clockwise for blue) + if np.cross(self.landmarks_map[1] - self.landmarks_map[0], self.landmarks_map[2] - self.landmarks_map[0]) > 0: + self.geometry_description_map[(3,3)] = 1 # blue + else: + self.geometry_description_map[(3,3)] = -1 # yellow def clear_data(self): self.obs_raw = [] @@ -156,29 +237,33 @@ def clear_data(self): self.landmarks_set = [] self.newPose = False - def get_obs_candidate(self, landmark, obs_raw): + def get_obs_candidate(self, landmark): obs_candidates = [] x_r, y_r, phi_r = self.robot_pose if self.debug_mode: print(f"Robot pose from TF: {self.robot_pose}") x_o, y_o = landmark r_prime = np.sqrt((x_o - x_r) ** 2 + (y_o - y_r) ** 2) - # theta_rob = np.arctan2( - temp = angle_limit_checking(np.arctan2(y_o - y_r, x_o - x_r)) # limit checking is not necessary right? + temp = np.arctan2(y_o - y_r, x_o - x_r) # limit checking is not necessary right? theta_prime = angle_limit_checking(temp - phi_r) H = np.array([ [-(x_o - x_r) / r_prime, -(y_o - y_r) / r_prime, 0], [(y_o - y_r) / r_prime ** 2, -(x_o - x_r) / r_prime ** 2, -1] ]) + if self.P_pred.size == 0 or self.P_pred.shape != (3, 3): + self.get_logger().error("P_pred is not properly initialized") + return [] S = H @ self.P_pred @ H.T + self.R S_inv = np.linalg.inv(S) - S_det = np.linalg.det(S) - marker_id = 0 - marker_array = MarkerArray() - for obs in obs_raw: + for obs in self.obs_raw: + r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) + theta_z = np.arctan2(obs[1], obs[0]) + if r_z < 1.3: + obs[0] = obs[0] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.cos(theta_z) + obs[1] = obs[1] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.sin(theta_z) r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) theta_z = np.arctan2(obs[1], obs[0]) y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)]) @@ -186,69 +271,31 @@ def get_obs_candidate(self, landmark, obs_raw): likelihood = np.exp(-0.5 * di_square) if likelihood > self.likelihood_threshold: - # self.get_logger().info(f"Likelihood: {likelihood}, obs:{obs}") - if r_z < 1.3: - obs[0] = obs[0] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.cos(theta_z) - obs[1] = obs[1] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.sin(theta_z) - r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2) - theta_z = np.arctan2(obs[1], obs[0]) - y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)]) - di_square = y.T @ S_inv @ y - likelihood = np.exp(-0.5 * di_square) - # self.get_logger().info(f"adjLikelihood: {likelihood}, obs:{obs}") obs_candidates.append({'position': obs, 'probability': likelihood}) - # if self.visualize_candidate and self.beacon_no == 1: - # marker = Marker() - # marker.header.frame_id = "robot_predict" - # marker.header.stamp = self.get_clock().now().to_msg() - # marker.ns = "candidates" - # marker.type = Marker.SPHERE - # marker.action = Marker.ADD - # marker.scale.x = 0.1 - # marker.scale.y = 0.1 - # marker.scale.z = 0.01 - - # text_marker = Marker() - # text_marker.header.frame_id = "robot_predict" - # text_marker.header.stamp = self.get_clock().now().to_msg() - # text_marker.ns = "text" - # text_marker.type = Marker.TEXT_VIEW_FACING - # text_marker.action = Marker.ADD - # text_marker.scale.z = 0.1 - # text_marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0) # White text - - # # use visualization_msgs to visualize the likelihood - # marker.pose.position.x = obs[0] - # marker.pose.position.y = obs[1] - # marker.pose.position.z = 0.0 - # marker.color = ColorRGBA(r=0.0, g=0.5, b=1.0, a=likelihood) - # marker_id += 1 - # marker.id = marker_id - # marker_array.markers.append(marker) - # text_marker.pose.position.x = obs[0] - # text_marker.pose.position.y = obs[1] - # text_marker.pose.position.z = 0.1 - # text_marker.text = f"{likelihood:.2f}" - # text_marker.id = marker_id - # marker_array.markers.append(text_marker) - # if self.visualize_candidate and self.beacon_no == 1: - # self.circles_pub.publish(marker_array) - # self.get_logger().debug("Published marker array") - # # clean up - # marker_array.markers.clear() + if self.visualize_true: + self.visualize_candidates(obs, likelihood) return obs_candidates - def get_landmarks_candidate(self, landmarks_map, obs_raw): + def get_landmarks_candidate(self): + landmarks_map = self.landmarks_map landmarks_candidate = [] self.beacon_no = 0 for landmark in landmarks_map: self.beacon_no += 1 + self.marker_id = 0 candidate = { 'landmark': landmark, - 'obs_candidates': self.get_obs_candidate(landmark, obs_raw) + 'obs_candidates': self.get_obs_candidate(landmark) } landmarks_candidate.append(candidate) + + if self.visualize_true: + self.remove_old_markers() + self.circles_pub.publish(self.marker_array) + # self.get_logger().debug("Published marker array") + self.marker_array.markers.clear() # clean up (is this enough?) + # print landmarks_candidate for debug if self.debug_mode: for i, landmark in enumerate(landmarks_candidate): @@ -257,8 +304,10 @@ def get_landmarks_candidate(self, landmarks_map, obs_raw): print(f"Obs {j + 1}: {obs_candidate['position']} with probability {obs_candidate['probability']}") return landmarks_candidate - def get_landmarks_set(self, landmarks_candidate): + def get_landmarks_set(self): + landmarks_candidate = self.landmarks_candidate landmarks_set = [] + for i in range(len(landmarks_candidate[0]['obs_candidates'])): for j in range(len(landmarks_candidate[1]['obs_candidates'])): for k in range(len(landmarks_candidate[2]['obs_candidates'])): @@ -287,13 +336,16 @@ def get_landmarks_set(self, landmarks_candidate): return landmarks_set - def get_lidar_pose(self, landmarks_set, landmarks_map): - if not landmarks_set: - raise ValueError("landmarks_set is empty") + def get_lidar_pose(self): + landmarks_map = self.landmarks_map + landmarks_set = self.landmarks_set + # if not landmarks_set: + # raise ValueError("landmarks_set is empty") + # prefer the set with more beacons - landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True) - # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort? - max_likelihood = max(set['probability_set'] for set in landmarks_set) + # landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True) # activate this if there's one more beacon + # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort? + max_likelihood = max(set['probability_set'] for set in landmarks_set) # TODO: sort by consistency or likelihood? max_likelihood_idx = next(i for i, set in enumerate(landmarks_set) if set['probability_set'] == max_likelihood) lidar_pose = np.zeros(3) @@ -317,6 +369,8 @@ def get_lidar_pose(self, landmarks_set, landmarks_map): try: X = np.linalg.solve(A.T @ A, A.T @ b) if X[0] < 0 or X[0] > 3 or X[1] < 0 or X[1] > 2: + # result is not in boundary area, try to use two beacons + self.get_two_beacons() return lidar_pose[0] = X[0] lidar_pose[1] = X[1] @@ -335,60 +389,141 @@ def get_lidar_pose(self, landmarks_set, landmarks_map): lidar_cov[1, 1] /= max_likelihood lidar_cov[2, 2] /= max_likelihood - # publish the lidar pose - self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() # TODO: compensation - self.lidar_pose_msg.header.frame_id = 'map' #TODO: param - self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] - self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] - self.lidar_pose_msg.pose.pose.position.z = 0.0 - self.lidar_pose_msg.pose.pose.orientation.x = 0.0 - self.lidar_pose_msg.pose.pose.orientation.y = 0.0 - self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2) - self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2) - self.lidar_pose_msg.pose.covariance = [ - lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] - ] - # self.get_logger().debug(f"lidar_pose: {lidar_pose}") - self.lidar_pose_pub.publish(self.lidar_pose_msg) - # self.get_logger().debug("Published lidar_pose message") + self.pub_lidar_pose(lidar_pose, lidar_cov) except np.linalg.LinAlgError as e: self.get_logger().warn("Linear algebra error: {}".format(e)) # use markerarray to show the landmarks it used - if self.visualize_candidate: - marker_array = MarkerArray() - for i, beacon in enumerate(beacons): - marker = Marker() - marker.header.frame_id = "base_footprint" - marker.header.stamp = self.get_clock().now().to_msg() - marker.ns = "chosen_landmarks" - marker.type = Marker.SPHERE - marker.action = Marker.ADD - marker.scale.x = 0.1 - marker.scale.y = 0.1 - marker.scale.z = 0.01 - marker.pose.position.x = beacon[0] - marker.pose.position.y = beacon[1] - marker.pose.position.z = 0.0 - marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0) - marker.id = i - marker_array.markers.append(marker) - self.circles_pub.publish(marker_array) - self.get_logger().debug("Published marker array") - # clean up - marker_array.markers.clear() + if self.visualize_true: + self.visualize_sets(beacons, max_likelihood, landmarks_set[max_likelihood_idx]['consistency']) else: self.get_logger().debug("not enough beacons") return lidar_pose, lidar_cov + def visualize_sets(self, beacons, max_likelihood, consistency): + marker_array = MarkerArray() + for i, beacon in enumerate(beacons): + marker = Marker() + marker.header.frame_id = "base_footprint" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "chosen_landmarks" + marker.type = Marker.SPHERE + marker.action = Marker.ADD + marker.scale.x = 0.15 + marker.scale.y = 0.15 + marker.scale.z = 0.01 + marker.pose.position.x = beacon[0] + marker.pose.position.y = beacon[1] + marker.pose.position.z = -0.1 + if len(beacons) == 3: + marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0) + elif len(beacons) == 2: + marker.color = ColorRGBA(r=0.5, g=0.5, b=0.0, a=1.0) + marker.id = i + marker_array.markers.append(marker) + + if len(beacons) == 2: + # delete marker id 3 + marker_delete = Marker() + marker_delete.header.frame_id = "base_footprint" + marker_delete.ns = "chosen_landmarks" + marker_delete.id = 3 + marker_delete.type = Marker.SPHERE + marker_delete.action = Marker.DELETE + marker_array.markers.append(marker_delete) + + # add the max_likelihood, consistency to the marker array + marker = Marker() + marker.header.frame_id = "map" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "set_param" + marker.type = Marker.TEXT_VIEW_FACING + marker.action = Marker.ADD + marker.scale.z = 0.1 + marker.pose.position.x = 3.5 + marker.pose.position.y = 2.5 + marker.pose.position.z = 0.5 + marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0) + marker.text = f"max_likelihood: {max_likelihood:.2f}, consistency: {consistency:.2f}" + marker.id = 10 + marker_array.markers.append(marker) + # publish the marker array + self.circles_pub.publish(marker_array) + self.get_logger().debug("Published marker array") + # clean up + marker_array.markers.clear() + + def visualize_candidates(self, obs, likelihood): + + self.marker_id += 1 + + # circles + marker = Marker() + marker.header.frame_id = "base_footprint" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = f"candidates_circle{self.beacon_no}" + marker.type = Marker.SPHERE + marker.action = Marker.ADD + marker.scale.x = 0.1 + marker.scale.y = 0.1 + marker.scale.z = 0.01 + + marker.pose.position.x = obs[0] + marker.pose.position.y = obs[1] + marker.pose.position.z = 0.0 + if self.beacon_no == 1: + marker.color = ColorRGBA(r=1.0, g=0.0, b=0.0, a=likelihood) # Red for beacon 1 + elif self.beacon_no == 2: + marker.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=likelihood) # Green for beacon 2 + elif self.beacon_no == 3: + marker.color = ColorRGBA(r=0.0, g=0.0, b=1.0, a=likelihood) # Blue for beacon 3 + marker.id = self.marker_id + self.marker_array.markers.append(marker) + + # texts + text_marker = Marker() + text_marker.header.frame_id = "base_footprint" + text_marker.header.stamp = self.get_clock().now().to_msg() + text_marker.ns = f"candidates_text{self.beacon_no}" + text_marker.type = Marker.TEXT_VIEW_FACING + text_marker.action = Marker.ADD + text_marker.scale.z = 0.1 + text_marker.color = ColorRGBA(r=0.0, g=0.0, b=0.0, a=0.2) # White text + + text_marker.pose.position.x = obs[0] + 0.2 + text_marker.pose.position.y = obs[1] + text_marker.pose.position.z = 0.1 + text_marker.text = f"{likelihood:.2f}" + text_marker.id = self.marker_id + self.marker_array.markers.append(text_marker) + + def remove_old_markers(self): + # remove the old markers + num_old_markers = self.marker_num_pre[self.beacon_no - 1] - self.marker_id + for i in range(num_old_markers): + old_marker = Marker() + old_marker.header.frame_id = "base_footprint" + old_marker.header.stamp = self.get_clock().now().to_msg() + old_marker.ns = f"candidates_circle{self.beacon_no}" + old_marker.type = Marker.SPHERE + old_marker.action = Marker.DELETE + old_marker.id = i + 1 + self.marker_array.markers.append(old_marker) + + old_text_marker = Marker() + old_text_marker.header.frame_id = "base_footprint" + old_text_marker.header.stamp = self.get_clock().now().to_msg() + old_text_marker.ns = f"candidates_text{self.beacon_no}" + old_text_marker.type = Marker.TEXT_VIEW_FACING + old_text_marker.action = Marker.DELETE + old_text_marker.id = i + 1 + self.marker_array.markers.append(old_text_marker) + # update the marker number + self.marker_num_pre[self.beacon_no - 1] = self.marker_id + def get_geometry_consistency(self, beacons): geometry_description = {} consistency = 1.0 @@ -416,6 +551,194 @@ def get_geometry_consistency(self, beacons): consistency = 0 return consistency + + def pub_lidar_pose(self, lidar_pose_, lidar_cov): + lidar_pose = self.pose_compensation(lidar_pose_) + # publish the lidar pose + self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() + self.lidar_pose_msg.header.frame_id = 'map' #TODO: param + self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0] + self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1] + self.lidar_pose_msg.pose.pose.position.z = 0.0 + self.lidar_pose_msg.pose.pose.orientation.x = 0.0 + self.lidar_pose_msg.pose.pose.orientation.y = 0.0 + self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2) + self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2) + self.lidar_pose_msg.pose.covariance = [ + lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2] + ] + self.lidar_pose_pub.publish(self.lidar_pose_msg) + + def get_two_beacons(self): + + two_index = [] + self.get_logger().info("get two beacons") + + # get the index of the two beacons with candidates and obstacle probability larger than 0.8 + for i in range(3): + if any(candidate['probability'] > self.likelihood_threshold_two for candidate in self.landmarks_candidate[i]['obs_candidates']): + two_index.append(i) + + if len(two_index) < 2: + return + + self.get_logger().info(f"two_index: {two_index}") + + # nominal geometry + nominal_distance = np.linalg.norm(self.landmarks_map[two_index[0]] - self.landmarks_map[two_index[1]]) + self.get_logger().info(f"nominal distance: {nominal_distance:.2f}") + angle1 = np.arctan2(self.landmarks_map[two_index[1]][1] - self.robot_pose[1], self.landmarks_map[two_index[1]][0] - self.robot_pose[0]) + angle0 = np.arctan2(self.landmarks_map[two_index[0]][1] - self.robot_pose[1], self.landmarks_map[two_index[0]][0] - self.robot_pose[0]) + nominal_angle = angle_limit_checking(angle1 - angle0) + # print nominal angle, display in degree + self.get_logger().info(f"nominal angle: {nominal_angle * 180 / np.pi:.2f} degree") + + # get the sets + for i in range(len(self.landmarks_candidate[two_index[0]]['obs_candidates'])): + for j in range(len(self.landmarks_candidate[two_index[1]]['obs_candidates'])): + set = { + 'beacons': { + 0: self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['position'], + 1: self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['position'] + } + } + # geometry consistency + # 1. check the cross product: 'robot to beacon 0' cross 'robot to beacon 1' + cross = np.cross(set['beacons'][0], set['beacons'][1]) # TODO: check if beacon 0 and 1 will be in the order of beacon a, b and c + if self.side == 0: # yellow is clockwise(negative) + if cross > 0: + # self.get_logger().debug("cross product is positive") + continue + elif self.side == 1: # blue is counter-clockwise(positive) + if cross < 0: + # self.get_logger().debug("cross product is negative") + continue + # 2. check the distance between the two beacons + set['consistency'] = 1 - abs(np.linalg.norm(set['beacons'][0] - set['beacons'][1]) - nominal_distance) / nominal_distance + if set['consistency'] < self.consistency_threshold_two: + self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold_two}: {set['consistency']}") + continue + # 3. check the angle between the two beacons + angle1 = np.arctan2(set['beacons'][1][1], set['beacons'][1][0]) + angle0 = np.arctan2(set['beacons'][0][1], set['beacons'][0][0]) + angle = angle_limit_checking(angle1 - angle0) + if abs(angle_limit_checking(angle - nominal_angle)) > np.pi / 18: # 10 degree + self.get_logger().error(f"Angle difference is too large: {angle * 180 / np.pi:.2f} degree") + continue + # self.get_logger().info(f"angle1: {angle1 * 180 / np.pi:.2f} degree") + # self.get_logger().info(f"angle0: {angle0 * 180 / np.pi:.2f} degree") + # self.get_logger().info(f"cadidate angle: {angle * 180 / np.pi:.2f} degree") + # probability of the set + set['probability_set'] = self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['probability'] * self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['probability'] + if set['probability_set'] < self.likelihood_threshold_two: + self.get_logger().debug(f"Probability is less than {self.likelihood_threshold_two}: {set['probability_set']}") + continue + self.landmarks_set.append(set) + + # check if there is valid set + if len(self.landmarks_set) == 0: + self.get_logger().warn("no valid set") + return + + # use the set with the highest probability + max_likelihood = max(set['probability_set'] for set in self.landmarks_set) + max_likelihood_idx = next(i for i, set in enumerate(self.landmarks_set) if set['probability_set'] == max_likelihood) + beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][0], self.landmarks_set[max_likelihood_idx]['beacons'][1]] + # calculate the lidar pose + lidar_pose = self.get_lidar_pose_two( + two_index[0], + two_index[1], + beacons[0][0], beacons[0][1], + beacons[1][0], beacons[1][1] + ) + # check if the lidar pose is in the map + if lidar_pose[0] < 0 or lidar_pose[0] > 3 or lidar_pose[1] < 0 or lidar_pose[1] > 2: + self.get_logger().debug("lidar pose is out of map") + return + lidar_cov = np.diag([0.1**2, 0.1**2, 0.1**2]) + lidar_cov[0, 0] /= max_likelihood + lidar_cov[1, 1] /= max_likelihood + lidar_cov[2, 2] /= max_likelihood + # publish the lidar pose + self.pub_lidar_pose(lidar_pose, lidar_cov) + if self.visualize_true: + self.visualize_sets(beacons, max_likelihood, self.landmarks_set[max_likelihood_idx]['consistency']) + + # also print the likelihood + self.get_logger().info(f"lidar_pose (two): {lidar_pose}") + self.get_logger().info(f"lidar_pose (two) likelihood: {max_likelihood}") + + self.clear_data() + + def get_lidar_pose_two(self, landmark_index_1, landmark_index_2, bxr, byr, cxr, cyr): + + lidar_pose = np.zeros(3) + # the beacons position w.r.t. map + bxm = self.landmarks_map[landmark_index_1][0] + bym = self.landmarks_map[landmark_index_1][1] + cxm = self.landmarks_map[landmark_index_2][0] + cym = self.landmarks_map[landmark_index_2][1] + x = cxm - bxm + y = cym - bym + xr = cxr - bxr + yr = cyr - byr + # calculate the angle from robot to map + costheta = (xr*x + yr*y) / (xr**2 + yr**2) + sintheta = (xr*y - yr*x) / (xr**2 + yr**2) + lidar_pose[2] = np.arctan2(sintheta, costheta) + # find the linear translation from robot to map + # 1. transform beacon positions from w.r.t. robot to w.r.t. map + bx = costheta * bxr - sintheta * byr + by = sintheta * bxr + costheta * byr + cx = costheta * cxr - sintheta * cyr + cy = sintheta * cxr + costheta * cyr + # 2. linear translation from robot to map, take the average of the result from the two beacons + lidar_pose[0] = (bxm - bx)/2 + (cxm - cx)/2 + lidar_pose[1] = (bym - by)/2 + (cym - cy)/2 + + return lidar_pose + + def pose_compensation(self, lidar_pose): + # find the translation and rotation from obs_time to now using TF + try: + now_transform = self.tf_buffer.lookup_transform( # get the latest transform + self.robot_parent_frame_id, + self.robot_frame_id, + rclpy.time.Time(), + timeout=rclpy.duration.Duration(seconds=0.1) + ) + relative_transform = now_transform + relative_transform.transform.translation.x -= self.predict_transform.transform.translation.x + relative_transform.transform.translation.y -= self.predict_transform.transform.translation.y + # find the relative rotation + q1_inv = [ + self.predict_transform.transform.rotation.x, + self.predict_transform.transform.rotation.y, + self.predict_transform.transform.rotation.z, + -self.predict_transform.transform.rotation.w # Negate for inverse + ] + + q0 = now_transform.transform.rotation + qr = [ + q0.x * q1_inv[3] + q0.w * q1_inv[0] + q0.y * q1_inv[2] - q0.z * q1_inv[1], + q0.y * q1_inv[3] + q0.w * q1_inv[1] + q0.z * q1_inv[0] - q0.x * q1_inv[2], + q0.z * q1_inv[3] + q0.w * q1_inv[2] + q0.x * q1_inv[1] - q0.y * q1_inv[0], + q0.w * q1_inv[3] - q0.x * q1_inv[0] - q0.y * q1_inv[1] - q0.z * q1_inv[2] + ] + + lidar_pose[0] += relative_transform.transform.translation.x + lidar_pose[1] += relative_transform.transform.translation.y + lidar_pose[2] += yaw_from_quaternion(qr[0], qr[1], qr[2], qr[3]) + return lidar_pose + except (LookupException, ConnectivityException, ExtrapolationException) as e: + self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}') + self.get_logger().error("Compensation: Could not transform the robot pose") + return lidar_pose def quaternion_from_euler(ai, aj, ak): ai /= 2.0 @@ -438,14 +761,7 @@ def quaternion_from_euler(ai, aj, ak): q[3] = cj * cc + sj * ss return q -def euler_from_quaternion(x, y, z, w): - t0 = +2.0 * (w * x + y * z) - t1 = +1.0 - 2.0 * (x * x + y * y) - roll_x = np.arctan2(t0, t1) - t2 = +2.0 * (w * y - z * x) - t2 = +1.0 if t2 > +1.0 else t2 - t2 = -1.0 if t2 < -1.0 else t2 - pitch_y = np.arcsin(t2) +def yaw_from_quaternion(x, y, z, w): t3 = +2.0 * (w * z + x * y) t4 = +1.0 - 2.0 * (y * y + z * z) yaw_z = np.arctan2(t3, t4)