diff --git a/localization-devel-ws/ekf/launch/blue.launch b/localization-devel-ws/ekf/launch/blue.launch
index a7545c2..5989f99 100644
--- a/localization-devel-ws/ekf/launch/blue.launch
+++ b/localization-devel-ws/ekf/launch/blue.launch
@@ -35,7 +35,9 @@
-
+
+
+
diff --git a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py
index 49a67e4..7698622 100644
--- a/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py
+++ b/localization-devel-ws/global/lidar_localization_pkg/lidar_localization_pkg/lidar_member_function.py
@@ -1,6 +1,8 @@
import rclpy
from rclpy.node import Node
+from tf2_ros import Buffer, TransformListener, LookupException, ConnectivityException, ExtrapolationException
+
from geometry_msgs.msg import PoseWithCovarianceStamped
from obstacle_detector.msg import Obstacles
from visualization_msgs.msg import Marker, MarkerArray
@@ -13,19 +15,31 @@ class LidarLocalization(Node): # inherit from Node
def __init__(self):
super().__init__('lidar_localization_node')
- # Declare parameters
+ # Declare mode parameters
self.declare_parameter('side', 0)
self.declare_parameter('debug_mode', False)
self.declare_parameter('visualize_candidate', True)
+
+ # Declare algorithm parameters
self.declare_parameter('likelihood_threshold', 0.001)
+ self.declare_parameter('likelihood_threshold_two', 0.5)
self.declare_parameter('consistency_threshold', 0.9)
+ self.declare_parameter('consistency_threshold_two', 0.97)
+ self.declare_parameter('robot_frame_id', 'base_footprint')
+ self.declare_parameter('robot_parent_frame_id', 'map')
+ self.declare_parameter('R', [0.0025, 0.0025]) # measurement noise covariance matrix
# Get parameters
self.side = self.get_parameter('side').get_parameter_value().integer_value
self.debug_mode = self.get_parameter('debug_mode').get_parameter_value().bool_value
- self.visualize_candidate = self.get_parameter('visualize_candidate').get_parameter_value().bool_value
+ self.visualize_true = self.get_parameter('visualize_candidate').get_parameter_value().bool_value
self.likelihood_threshold = self.get_parameter('likelihood_threshold').get_parameter_value().double_value
+ self.likelihood_threshold_two = self.get_parameter('likelihood_threshold_two').get_parameter_value().double_value
self.consistency_threshold = self.get_parameter('consistency_threshold').get_parameter_value().double_value
+ self.consistency_threshold_two = self.get_parameter('consistency_threshold_two').get_parameter_value().double_value
+ self.robot_frame_id = self.get_parameter('robot_frame_id').get_parameter_value().string_value
+ self.robot_parent_frame_id = self.get_parameter('robot_parent_frame_id').get_parameter_value().string_value
+ self.R = np.array(self.get_parameter('R').get_parameter_value().double_array_value)
self.adjust_factor = 1
@@ -48,20 +62,24 @@ def __init__(self):
# ros settings
self.lidar_pose_pub = self.create_publisher(PoseWithCovarianceStamped, 'lidar_pose', 10)
- if self.visualize_candidate:
+
+ if self.visualize_true:
+ self.marker_array = MarkerArray()
+ self.marker_num_pre = np.array([0, 0, 0])
+ self.marker_id = 0
self.circles_pub = self.create_publisher(MarkerArray, 'candidates', 10)
+
self.subscription = self.create_subscription(
Obstacles,
'raw_obstacles',
self.obstacle_callback,
10)
- self.subscription = self.create_subscription(
+ self.subscription = self.create_subscription( # if TF is not available
PoseWithCovarianceStamped,
'final_pose',
self.pred_pose_callback,
10
)
- # subscribe to set_lidar_side topic
self.subscription = self.create_subscription(
String,
'set_lidar_side',
@@ -70,50 +88,109 @@ def __init__(self):
)
self.subscription # prevent unused variable warning
- # ros debug logger
- self.get_logger().debug('Lidar Localization Node has been initialized')
+ # tf2 buffer
+ self.tf_buffer = Buffer()
+ self.tf_listener = TransformListener(self.tf_buffer, self)
+
+ self.get_logger().debug('Lidar Localization Node initialized')
- self.init_landmarks_map(self.landmarks_map)
+ self.init_landmarks_map()
+ self.landmarks_set = []
self.robot_pose = []
- self.P_pred = []
+ self.robot_pose_topic = []
+ self.P_pred = np.array([[0.05**2, 0.0, 0.0], [0.0, 0.05**2, 0.0], [0.0, 0.0, 0.1]])
+ self.P_pred_topic = []
self.newPose = False
- self.R = np.array([[0.05**2, 0.0], [0.0, 0.05**2]]) # R: measurement noise; TODO: tune the value
self.lidar_pose_msg = PoseWithCovarianceStamped()
-
+
def obstacle_callback(self, msg):
- self.get_logger().debug('obstacle detected')
- # obstacle operation
+ self.obs_time = msg.header.stamp
self.obs_raw = []
+
+ # get latest robot pose
+ if self.get_robot_pose() == -1:
+ self.get_logger().debug("no new robot pose")
+ return
+
+ # get the obstacles
for obs in msg.circles:
self.obs_raw.append(np.array([obs.center.x, obs.center.y]))
- self.obs_time = msg.header.stamp
- # data processing
- if self.newPose == False: # Check if robot_pose or P_pred is empty
- self.get_logger().debug("no new robot pose or P_pred")
+
+ # main: data processing
+ self.landmarks_candidate = self.get_landmarks_candidate()
+
+ # if each of the three beacon has at least one candidate
+ landmarks_with_candidates = 0
+ for i in range(3):
+ if len(self.landmarks_candidate[i]['obs_candidates']) > 0:
+ landmarks_with_candidates += 1
+ if landmarks_with_candidates == 3:
+ self.landmarks_set = self.get_landmarks_set()
+ elif landmarks_with_candidates == 2:
+ self.get_two_beacons() # or devide into more steps?
return
- self.landmarks_candidate = self.get_landmarks_candidate(self.landmarks_map, self.obs_raw)
- self.landmarks_set = self.get_landmarks_set(self.landmarks_candidate)
- if len(self.landmarks_set) == 0:
+ else:
+ self.get_logger().warn("less than two landmarks")
+ return
+
+ # for the normal case complete three landmarks
+ if len(self.landmarks_set) == 0: # when fail to test consistency and likelihood check
self.get_logger().debug("empty landmarks set")
+ self.get_two_beacons()
return
- self.get_lidar_pose(self.landmarks_set, self.landmarks_map)
+
+ self.get_lidar_pose()
+
# clear used data
self.clear_data()
def pred_pose_callback(self, msg):
- # self.get_logger().debug("Robot pose callback triggered")
self.newPose = True
- orientation = euler_from_quaternion(msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) # raw, pitch, *yaw
- # check orientation range
+ orientation = yaw_from_quaternion(msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w) # raw, pitch, *yaw
+ # check orientation range TODO: should be -pi to pi??
if orientation < 0:
orientation += 2 * np.pi
- self.robot_pose = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation])
+ self.robot_pose_topic = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, orientation])
self.P_pred = np.array([
[msg.pose.covariance[0]*100, 0, 0],
[0, msg.pose.covariance[7]*100, 0],
- [0, 0, msg.pose.covariance[35]*1e6]
+ [0, 0, msg.pose.covariance[35]*1e3] # TODO: experiement with the value, do I give too much flexibility to the yaw? What value is better?
])
+ def get_robot_pose(self):
+ # check if TF is available. If true, use the TF. If false, use the latest topic
+ try:
+ self.predict_transform = self.tf_buffer.lookup_transform(
+ self.robot_parent_frame_id,
+ self.robot_frame_id,
+ self.obs_time
+ )
+ self.robot_pose = np.array([
+ self.predict_transform.transform.translation.x,
+ self.predict_transform.transform.translation.y,
+ yaw_from_quaternion(
+ self.predict_transform.transform.rotation.x,
+ self.predict_transform.transform.rotation.y,
+ self.predict_transform.transform.rotation.z,
+ self.predict_transform.transform.rotation.w
+ )
+ ])
+ return 0
+ except (LookupException, ConnectivityException, ExtrapolationException) as e:
+ self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}')
+ self.get_logger().info("now try to use the latest topic")
+ if self.newPose == False:
+ self.get_logger().error("no new predict topic, skip.")
+ return -1
+ else:
+ self.robot_pose = np.array([
+ self.robot_pose_topic[0],
+ self.robot_pose_topic[1],
+ self.robot_pose_topic[2]
+ ])
+ self.get_logger().info("use the latest topic")
+ return 0
+
def set_lidar_side_callback(self, msg):
side = msg.data.lower()
if side in ['0', '1', 'yellow', 'blue']:
@@ -131,22 +208,26 @@ def set_lidar_side_callback(self, msg):
np.array([3.094, 1.948]),
np.array([-0.094, 1.0])
]
- self.init_landmarks_map(self.landmarks_map)
+ self.init_landmarks_map()
self.get_logger().debug(f"Set lidar side to {self.side}")
else:
self.get_logger().warn("Invalid side value")
- def init_landmarks_map(self, landmarks_map):
- self.landmarks_map = landmarks_map
+ def init_landmarks_map(self):
# calculate the geometry description of landmarks
self.geometry_description_map = {}
- NUM_LANDMARKS = len(landmarks_map)
+ NUM_LANDMARKS = len(self.landmarks_map)
for i in range(NUM_LANDMARKS):
for j in range(i + 1, NUM_LANDMARKS):
if i == j:
continue
- d_ij = np.linalg.norm(landmarks_map[i] - landmarks_map[j])
+ d_ij = np.linalg.norm(self.landmarks_map[i] - self.landmarks_map[j])
self.geometry_description_map[(i, j)] = d_ij
+ # calculate the landmark sequence, mark clockwise or counter-clockwise (clockwise for yellow, counter-clockwise for blue)
+ if np.cross(self.landmarks_map[1] - self.landmarks_map[0], self.landmarks_map[2] - self.landmarks_map[0]) > 0:
+ self.geometry_description_map[(3,3)] = 1 # blue
+ else:
+ self.geometry_description_map[(3,3)] = -1 # yellow
def clear_data(self):
self.obs_raw = []
@@ -156,29 +237,33 @@ def clear_data(self):
self.landmarks_set = []
self.newPose = False
- def get_obs_candidate(self, landmark, obs_raw):
+ def get_obs_candidate(self, landmark):
obs_candidates = []
x_r, y_r, phi_r = self.robot_pose
if self.debug_mode:
print(f"Robot pose from TF: {self.robot_pose}")
x_o, y_o = landmark
r_prime = np.sqrt((x_o - x_r) ** 2 + (y_o - y_r) ** 2)
- # theta_rob = np.arctan2(
- temp = angle_limit_checking(np.arctan2(y_o - y_r, x_o - x_r)) # limit checking is not necessary right?
+ temp = np.arctan2(y_o - y_r, x_o - x_r) # limit checking is not necessary right?
theta_prime = angle_limit_checking(temp - phi_r)
H = np.array([
[-(x_o - x_r) / r_prime, -(y_o - y_r) / r_prime, 0],
[(y_o - y_r) / r_prime ** 2, -(x_o - x_r) / r_prime ** 2, -1]
])
+ if self.P_pred.size == 0 or self.P_pred.shape != (3, 3):
+ self.get_logger().error("P_pred is not properly initialized")
+ return []
S = H @ self.P_pred @ H.T + self.R
S_inv = np.linalg.inv(S)
- S_det = np.linalg.det(S)
- marker_id = 0
- marker_array = MarkerArray()
- for obs in obs_raw:
+ for obs in self.obs_raw:
+ r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2)
+ theta_z = np.arctan2(obs[1], obs[0])
+ if r_z < 1.3:
+ obs[0] = obs[0] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.cos(theta_z)
+ obs[1] = obs[1] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.sin(theta_z)
r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2)
theta_z = np.arctan2(obs[1], obs[0])
y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)])
@@ -186,69 +271,31 @@ def get_obs_candidate(self, landmark, obs_raw):
likelihood = np.exp(-0.5 * di_square)
if likelihood > self.likelihood_threshold:
- # self.get_logger().info(f"Likelihood: {likelihood}, obs:{obs}")
- if r_z < 1.3:
- obs[0] = obs[0] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.cos(theta_z)
- obs[1] = obs[1] + (1-obs[0]) * self.adjust_factor * 0.01414 * np.sin(theta_z)
- r_z = np.sqrt(obs[0] ** 2 + obs[1] ** 2)
- theta_z = np.arctan2(obs[1], obs[0])
- y = np.array([r_z - r_prime, angle_limit_checking(theta_z - theta_prime)])
- di_square = y.T @ S_inv @ y
- likelihood = np.exp(-0.5 * di_square)
- # self.get_logger().info(f"adjLikelihood: {likelihood}, obs:{obs}")
obs_candidates.append({'position': obs, 'probability': likelihood})
- # if self.visualize_candidate and self.beacon_no == 1:
- # marker = Marker()
- # marker.header.frame_id = "robot_predict"
- # marker.header.stamp = self.get_clock().now().to_msg()
- # marker.ns = "candidates"
- # marker.type = Marker.SPHERE
- # marker.action = Marker.ADD
- # marker.scale.x = 0.1
- # marker.scale.y = 0.1
- # marker.scale.z = 0.01
-
- # text_marker = Marker()
- # text_marker.header.frame_id = "robot_predict"
- # text_marker.header.stamp = self.get_clock().now().to_msg()
- # text_marker.ns = "text"
- # text_marker.type = Marker.TEXT_VIEW_FACING
- # text_marker.action = Marker.ADD
- # text_marker.scale.z = 0.1
- # text_marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0) # White text
-
- # # use visualization_msgs to visualize the likelihood
- # marker.pose.position.x = obs[0]
- # marker.pose.position.y = obs[1]
- # marker.pose.position.z = 0.0
- # marker.color = ColorRGBA(r=0.0, g=0.5, b=1.0, a=likelihood)
- # marker_id += 1
- # marker.id = marker_id
- # marker_array.markers.append(marker)
- # text_marker.pose.position.x = obs[0]
- # text_marker.pose.position.y = obs[1]
- # text_marker.pose.position.z = 0.1
- # text_marker.text = f"{likelihood:.2f}"
- # text_marker.id = marker_id
- # marker_array.markers.append(text_marker)
- # if self.visualize_candidate and self.beacon_no == 1:
- # self.circles_pub.publish(marker_array)
- # self.get_logger().debug("Published marker array")
- # # clean up
- # marker_array.markers.clear()
+ if self.visualize_true:
+ self.visualize_candidates(obs, likelihood)
return obs_candidates
- def get_landmarks_candidate(self, landmarks_map, obs_raw):
+ def get_landmarks_candidate(self):
+ landmarks_map = self.landmarks_map
landmarks_candidate = []
self.beacon_no = 0
for landmark in landmarks_map:
self.beacon_no += 1
+ self.marker_id = 0
candidate = {
'landmark': landmark,
- 'obs_candidates': self.get_obs_candidate(landmark, obs_raw)
+ 'obs_candidates': self.get_obs_candidate(landmark)
}
landmarks_candidate.append(candidate)
+
+ if self.visualize_true:
+ self.remove_old_markers()
+ self.circles_pub.publish(self.marker_array)
+ # self.get_logger().debug("Published marker array")
+ self.marker_array.markers.clear() # clean up (is this enough?)
+
# print landmarks_candidate for debug
if self.debug_mode:
for i, landmark in enumerate(landmarks_candidate):
@@ -257,8 +304,10 @@ def get_landmarks_candidate(self, landmarks_map, obs_raw):
print(f"Obs {j + 1}: {obs_candidate['position']} with probability {obs_candidate['probability']}")
return landmarks_candidate
- def get_landmarks_set(self, landmarks_candidate):
+ def get_landmarks_set(self):
+ landmarks_candidate = self.landmarks_candidate
landmarks_set = []
+
for i in range(len(landmarks_candidate[0]['obs_candidates'])):
for j in range(len(landmarks_candidate[1]['obs_candidates'])):
for k in range(len(landmarks_candidate[2]['obs_candidates'])):
@@ -287,13 +336,16 @@ def get_landmarks_set(self, landmarks_candidate):
return landmarks_set
- def get_lidar_pose(self, landmarks_set, landmarks_map):
- if not landmarks_set:
- raise ValueError("landmarks_set is empty")
+ def get_lidar_pose(self):
+ landmarks_map = self.landmarks_map
+ landmarks_set = self.landmarks_set
+ # if not landmarks_set:
+ # raise ValueError("landmarks_set is empty")
+
# prefer the set with more beacons
- landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True)
- # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort?
- max_likelihood = max(set['probability_set'] for set in landmarks_set)
+ # landmarks_set = sorted(landmarks_set, key=lambda x: len(x['beacons']), reverse=True) # activate this if there's one more beacon
+ # with the most beacon possible, prefer the set with the highest probability_set; TODO: better way to sort?
+ max_likelihood = max(set['probability_set'] for set in landmarks_set) # TODO: sort by consistency or likelihood?
max_likelihood_idx = next(i for i, set in enumerate(landmarks_set) if set['probability_set'] == max_likelihood)
lidar_pose = np.zeros(3)
@@ -317,6 +369,8 @@ def get_lidar_pose(self, landmarks_set, landmarks_map):
try:
X = np.linalg.solve(A.T @ A, A.T @ b)
if X[0] < 0 or X[0] > 3 or X[1] < 0 or X[1] > 2:
+ # result is not in boundary area, try to use two beacons
+ self.get_two_beacons()
return
lidar_pose[0] = X[0]
lidar_pose[1] = X[1]
@@ -335,60 +389,141 @@ def get_lidar_pose(self, landmarks_set, landmarks_map):
lidar_cov[1, 1] /= max_likelihood
lidar_cov[2, 2] /= max_likelihood
- # publish the lidar pose
- self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg() # TODO: compensation
- self.lidar_pose_msg.header.frame_id = 'map' #TODO: param
- self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0]
- self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1]
- self.lidar_pose_msg.pose.pose.position.z = 0.0
- self.lidar_pose_msg.pose.pose.orientation.x = 0.0
- self.lidar_pose_msg.pose.pose.orientation.y = 0.0
- self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2)
- self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2)
- self.lidar_pose_msg.pose.covariance = [
- lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2]
- ]
- # self.get_logger().debug(f"lidar_pose: {lidar_pose}")
- self.lidar_pose_pub.publish(self.lidar_pose_msg)
- # self.get_logger().debug("Published lidar_pose message")
+ self.pub_lidar_pose(lidar_pose, lidar_cov)
except np.linalg.LinAlgError as e:
self.get_logger().warn("Linear algebra error: {}".format(e))
# use markerarray to show the landmarks it used
- if self.visualize_candidate:
- marker_array = MarkerArray()
- for i, beacon in enumerate(beacons):
- marker = Marker()
- marker.header.frame_id = "base_footprint"
- marker.header.stamp = self.get_clock().now().to_msg()
- marker.ns = "chosen_landmarks"
- marker.type = Marker.SPHERE
- marker.action = Marker.ADD
- marker.scale.x = 0.1
- marker.scale.y = 0.1
- marker.scale.z = 0.01
- marker.pose.position.x = beacon[0]
- marker.pose.position.y = beacon[1]
- marker.pose.position.z = 0.0
- marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0)
- marker.id = i
- marker_array.markers.append(marker)
- self.circles_pub.publish(marker_array)
- self.get_logger().debug("Published marker array")
- # clean up
- marker_array.markers.clear()
+ if self.visualize_true:
+ self.visualize_sets(beacons, max_likelihood, landmarks_set[max_likelihood_idx]['consistency'])
else:
self.get_logger().debug("not enough beacons")
return lidar_pose, lidar_cov
+ def visualize_sets(self, beacons, max_likelihood, consistency):
+ marker_array = MarkerArray()
+ for i, beacon in enumerate(beacons):
+ marker = Marker()
+ marker.header.frame_id = "base_footprint"
+ marker.header.stamp = self.get_clock().now().to_msg()
+ marker.ns = "chosen_landmarks"
+ marker.type = Marker.SPHERE
+ marker.action = Marker.ADD
+ marker.scale.x = 0.15
+ marker.scale.y = 0.15
+ marker.scale.z = 0.01
+ marker.pose.position.x = beacon[0]
+ marker.pose.position.y = beacon[1]
+ marker.pose.position.z = -0.1
+ if len(beacons) == 3:
+ marker.color = ColorRGBA(r=0.0, g=0.5, b=0.5, a=1.0)
+ elif len(beacons) == 2:
+ marker.color = ColorRGBA(r=0.5, g=0.5, b=0.0, a=1.0)
+ marker.id = i
+ marker_array.markers.append(marker)
+
+ if len(beacons) == 2:
+ # delete marker id 3
+ marker_delete = Marker()
+ marker_delete.header.frame_id = "base_footprint"
+ marker_delete.ns = "chosen_landmarks"
+ marker_delete.id = 3
+ marker_delete.type = Marker.SPHERE
+ marker_delete.action = Marker.DELETE
+ marker_array.markers.append(marker_delete)
+
+ # add the max_likelihood, consistency to the marker array
+ marker = Marker()
+ marker.header.frame_id = "map"
+ marker.header.stamp = self.get_clock().now().to_msg()
+ marker.ns = "set_param"
+ marker.type = Marker.TEXT_VIEW_FACING
+ marker.action = Marker.ADD
+ marker.scale.z = 0.1
+ marker.pose.position.x = 3.5
+ marker.pose.position.y = 2.5
+ marker.pose.position.z = 0.5
+ marker.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=1.0)
+ marker.text = f"max_likelihood: {max_likelihood:.2f}, consistency: {consistency:.2f}"
+ marker.id = 10
+ marker_array.markers.append(marker)
+ # publish the marker array
+ self.circles_pub.publish(marker_array)
+ self.get_logger().debug("Published marker array")
+ # clean up
+ marker_array.markers.clear()
+
+ def visualize_candidates(self, obs, likelihood):
+
+ self.marker_id += 1
+
+ # circles
+ marker = Marker()
+ marker.header.frame_id = "base_footprint"
+ marker.header.stamp = self.get_clock().now().to_msg()
+ marker.ns = f"candidates_circle{self.beacon_no}"
+ marker.type = Marker.SPHERE
+ marker.action = Marker.ADD
+ marker.scale.x = 0.1
+ marker.scale.y = 0.1
+ marker.scale.z = 0.01
+
+ marker.pose.position.x = obs[0]
+ marker.pose.position.y = obs[1]
+ marker.pose.position.z = 0.0
+ if self.beacon_no == 1:
+ marker.color = ColorRGBA(r=1.0, g=0.0, b=0.0, a=likelihood) # Red for beacon 1
+ elif self.beacon_no == 2:
+ marker.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=likelihood) # Green for beacon 2
+ elif self.beacon_no == 3:
+ marker.color = ColorRGBA(r=0.0, g=0.0, b=1.0, a=likelihood) # Blue for beacon 3
+ marker.id = self.marker_id
+ self.marker_array.markers.append(marker)
+
+ # texts
+ text_marker = Marker()
+ text_marker.header.frame_id = "base_footprint"
+ text_marker.header.stamp = self.get_clock().now().to_msg()
+ text_marker.ns = f"candidates_text{self.beacon_no}"
+ text_marker.type = Marker.TEXT_VIEW_FACING
+ text_marker.action = Marker.ADD
+ text_marker.scale.z = 0.1
+ text_marker.color = ColorRGBA(r=0.0, g=0.0, b=0.0, a=0.2) # White text
+
+ text_marker.pose.position.x = obs[0] + 0.2
+ text_marker.pose.position.y = obs[1]
+ text_marker.pose.position.z = 0.1
+ text_marker.text = f"{likelihood:.2f}"
+ text_marker.id = self.marker_id
+ self.marker_array.markers.append(text_marker)
+
+ def remove_old_markers(self):
+ # remove the old markers
+ num_old_markers = self.marker_num_pre[self.beacon_no - 1] - self.marker_id
+ for i in range(num_old_markers):
+ old_marker = Marker()
+ old_marker.header.frame_id = "base_footprint"
+ old_marker.header.stamp = self.get_clock().now().to_msg()
+ old_marker.ns = f"candidates_circle{self.beacon_no}"
+ old_marker.type = Marker.SPHERE
+ old_marker.action = Marker.DELETE
+ old_marker.id = i + 1
+ self.marker_array.markers.append(old_marker)
+
+ old_text_marker = Marker()
+ old_text_marker.header.frame_id = "base_footprint"
+ old_text_marker.header.stamp = self.get_clock().now().to_msg()
+ old_text_marker.ns = f"candidates_text{self.beacon_no}"
+ old_text_marker.type = Marker.TEXT_VIEW_FACING
+ old_text_marker.action = Marker.DELETE
+ old_text_marker.id = i + 1
+ self.marker_array.markers.append(old_text_marker)
+ # update the marker number
+ self.marker_num_pre[self.beacon_no - 1] = self.marker_id
+
def get_geometry_consistency(self, beacons):
geometry_description = {}
consistency = 1.0
@@ -416,6 +551,194 @@ def get_geometry_consistency(self, beacons):
consistency = 0
return consistency
+
+ def pub_lidar_pose(self, lidar_pose_, lidar_cov):
+ lidar_pose = self.pose_compensation(lidar_pose_)
+ # publish the lidar pose
+ self.lidar_pose_msg.header.stamp = self.get_clock().now().to_msg()
+ self.lidar_pose_msg.header.frame_id = 'map' #TODO: param
+ self.lidar_pose_msg.pose.pose.position.x = lidar_pose[0]
+ self.lidar_pose_msg.pose.pose.position.y = lidar_pose[1]
+ self.lidar_pose_msg.pose.pose.position.z = 0.0
+ self.lidar_pose_msg.pose.pose.orientation.x = 0.0
+ self.lidar_pose_msg.pose.pose.orientation.y = 0.0
+ self.lidar_pose_msg.pose.pose.orientation.z = np.sin(lidar_pose[2] / 2)
+ self.lidar_pose_msg.pose.pose.orientation.w = np.cos(lidar_pose[2] / 2)
+ self.lidar_pose_msg.pose.covariance = [
+ lidar_cov[0, 0], 0.0, 0.0, 0.0, 0.0, 0.0,
+ 0.0, lidar_cov[1, 1], 0.0, 0.0, 0.0, 0.0,
+ 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
+ 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
+ 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
+ 0.0, 0.0, 0.0, 0.0, 0.0, lidar_cov[2, 2]
+ ]
+ self.lidar_pose_pub.publish(self.lidar_pose_msg)
+
+ def get_two_beacons(self):
+
+ two_index = []
+ self.get_logger().info("get two beacons")
+
+ # get the index of the two beacons with candidates and obstacle probability larger than 0.8
+ for i in range(3):
+ if any(candidate['probability'] > self.likelihood_threshold_two for candidate in self.landmarks_candidate[i]['obs_candidates']):
+ two_index.append(i)
+
+ if len(two_index) < 2:
+ return
+
+ self.get_logger().info(f"two_index: {two_index}")
+
+ # nominal geometry
+ nominal_distance = np.linalg.norm(self.landmarks_map[two_index[0]] - self.landmarks_map[two_index[1]])
+ self.get_logger().info(f"nominal distance: {nominal_distance:.2f}")
+ angle1 = np.arctan2(self.landmarks_map[two_index[1]][1] - self.robot_pose[1], self.landmarks_map[two_index[1]][0] - self.robot_pose[0])
+ angle0 = np.arctan2(self.landmarks_map[two_index[0]][1] - self.robot_pose[1], self.landmarks_map[two_index[0]][0] - self.robot_pose[0])
+ nominal_angle = angle_limit_checking(angle1 - angle0)
+ # print nominal angle, display in degree
+ self.get_logger().info(f"nominal angle: {nominal_angle * 180 / np.pi:.2f} degree")
+
+ # get the sets
+ for i in range(len(self.landmarks_candidate[two_index[0]]['obs_candidates'])):
+ for j in range(len(self.landmarks_candidate[two_index[1]]['obs_candidates'])):
+ set = {
+ 'beacons': {
+ 0: self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['position'],
+ 1: self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['position']
+ }
+ }
+ # geometry consistency
+ # 1. check the cross product: 'robot to beacon 0' cross 'robot to beacon 1'
+ cross = np.cross(set['beacons'][0], set['beacons'][1]) # TODO: check if beacon 0 and 1 will be in the order of beacon a, b and c
+ if self.side == 0: # yellow is clockwise(negative)
+ if cross > 0:
+ # self.get_logger().debug("cross product is positive")
+ continue
+ elif self.side == 1: # blue is counter-clockwise(positive)
+ if cross < 0:
+ # self.get_logger().debug("cross product is negative")
+ continue
+ # 2. check the distance between the two beacons
+ set['consistency'] = 1 - abs(np.linalg.norm(set['beacons'][0] - set['beacons'][1]) - nominal_distance) / nominal_distance
+ if set['consistency'] < self.consistency_threshold_two:
+ self.get_logger().debug(f"Geometry consistency is less than {self.consistency_threshold_two}: {set['consistency']}")
+ continue
+ # 3. check the angle between the two beacons
+ angle1 = np.arctan2(set['beacons'][1][1], set['beacons'][1][0])
+ angle0 = np.arctan2(set['beacons'][0][1], set['beacons'][0][0])
+ angle = angle_limit_checking(angle1 - angle0)
+ if abs(angle_limit_checking(angle - nominal_angle)) > np.pi / 18: # 10 degree
+ self.get_logger().error(f"Angle difference is too large: {angle * 180 / np.pi:.2f} degree")
+ continue
+ # self.get_logger().info(f"angle1: {angle1 * 180 / np.pi:.2f} degree")
+ # self.get_logger().info(f"angle0: {angle0 * 180 / np.pi:.2f} degree")
+ # self.get_logger().info(f"cadidate angle: {angle * 180 / np.pi:.2f} degree")
+ # probability of the set
+ set['probability_set'] = self.landmarks_candidate[two_index[0]]['obs_candidates'][i]['probability'] * self.landmarks_candidate[two_index[1]]['obs_candidates'][j]['probability']
+ if set['probability_set'] < self.likelihood_threshold_two:
+ self.get_logger().debug(f"Probability is less than {self.likelihood_threshold_two}: {set['probability_set']}")
+ continue
+ self.landmarks_set.append(set)
+
+ # check if there is valid set
+ if len(self.landmarks_set) == 0:
+ self.get_logger().warn("no valid set")
+ return
+
+ # use the set with the highest probability
+ max_likelihood = max(set['probability_set'] for set in self.landmarks_set)
+ max_likelihood_idx = next(i for i, set in enumerate(self.landmarks_set) if set['probability_set'] == max_likelihood)
+ beacons = [self.landmarks_set[max_likelihood_idx]['beacons'][0], self.landmarks_set[max_likelihood_idx]['beacons'][1]]
+ # calculate the lidar pose
+ lidar_pose = self.get_lidar_pose_two(
+ two_index[0],
+ two_index[1],
+ beacons[0][0], beacons[0][1],
+ beacons[1][0], beacons[1][1]
+ )
+ # check if the lidar pose is in the map
+ if lidar_pose[0] < 0 or lidar_pose[0] > 3 or lidar_pose[1] < 0 or lidar_pose[1] > 2:
+ self.get_logger().debug("lidar pose is out of map")
+ return
+ lidar_cov = np.diag([0.1**2, 0.1**2, 0.1**2])
+ lidar_cov[0, 0] /= max_likelihood
+ lidar_cov[1, 1] /= max_likelihood
+ lidar_cov[2, 2] /= max_likelihood
+ # publish the lidar pose
+ self.pub_lidar_pose(lidar_pose, lidar_cov)
+ if self.visualize_true:
+ self.visualize_sets(beacons, max_likelihood, self.landmarks_set[max_likelihood_idx]['consistency'])
+
+ # also print the likelihood
+ self.get_logger().info(f"lidar_pose (two): {lidar_pose}")
+ self.get_logger().info(f"lidar_pose (two) likelihood: {max_likelihood}")
+
+ self.clear_data()
+
+ def get_lidar_pose_two(self, landmark_index_1, landmark_index_2, bxr, byr, cxr, cyr):
+
+ lidar_pose = np.zeros(3)
+ # the beacons position w.r.t. map
+ bxm = self.landmarks_map[landmark_index_1][0]
+ bym = self.landmarks_map[landmark_index_1][1]
+ cxm = self.landmarks_map[landmark_index_2][0]
+ cym = self.landmarks_map[landmark_index_2][1]
+ x = cxm - bxm
+ y = cym - bym
+ xr = cxr - bxr
+ yr = cyr - byr
+ # calculate the angle from robot to map
+ costheta = (xr*x + yr*y) / (xr**2 + yr**2)
+ sintheta = (xr*y - yr*x) / (xr**2 + yr**2)
+ lidar_pose[2] = np.arctan2(sintheta, costheta)
+ # find the linear translation from robot to map
+ # 1. transform beacon positions from w.r.t. robot to w.r.t. map
+ bx = costheta * bxr - sintheta * byr
+ by = sintheta * bxr + costheta * byr
+ cx = costheta * cxr - sintheta * cyr
+ cy = sintheta * cxr + costheta * cyr
+ # 2. linear translation from robot to map, take the average of the result from the two beacons
+ lidar_pose[0] = (bxm - bx)/2 + (cxm - cx)/2
+ lidar_pose[1] = (bym - by)/2 + (cym - cy)/2
+
+ return lidar_pose
+
+ def pose_compensation(self, lidar_pose):
+ # find the translation and rotation from obs_time to now using TF
+ try:
+ now_transform = self.tf_buffer.lookup_transform( # get the latest transform
+ self.robot_parent_frame_id,
+ self.robot_frame_id,
+ rclpy.time.Time(),
+ timeout=rclpy.duration.Duration(seconds=0.1)
+ )
+ relative_transform = now_transform
+ relative_transform.transform.translation.x -= self.predict_transform.transform.translation.x
+ relative_transform.transform.translation.y -= self.predict_transform.transform.translation.y
+ # find the relative rotation
+ q1_inv = [
+ self.predict_transform.transform.rotation.x,
+ self.predict_transform.transform.rotation.y,
+ self.predict_transform.transform.rotation.z,
+ -self.predict_transform.transform.rotation.w # Negate for inverse
+ ]
+
+ q0 = now_transform.transform.rotation
+ qr = [
+ q0.x * q1_inv[3] + q0.w * q1_inv[0] + q0.y * q1_inv[2] - q0.z * q1_inv[1],
+ q0.y * q1_inv[3] + q0.w * q1_inv[1] + q0.z * q1_inv[0] - q0.x * q1_inv[2],
+ q0.z * q1_inv[3] + q0.w * q1_inv[2] + q0.x * q1_inv[1] - q0.y * q1_inv[0],
+ q0.w * q1_inv[3] - q0.x * q1_inv[0] - q0.y * q1_inv[1] - q0.z * q1_inv[2]
+ ]
+
+ lidar_pose[0] += relative_transform.transform.translation.x
+ lidar_pose[1] += relative_transform.transform.translation.y
+ lidar_pose[2] += yaw_from_quaternion(qr[0], qr[1], qr[2], qr[3])
+ return lidar_pose
+ except (LookupException, ConnectivityException, ExtrapolationException) as e:
+ self.get_logger().error(f'Could not transform {self.robot_parent_frame_id} to {self.robot_frame_id}: {e}')
+ self.get_logger().error("Compensation: Could not transform the robot pose")
+ return lidar_pose
def quaternion_from_euler(ai, aj, ak):
ai /= 2.0
@@ -438,14 +761,7 @@ def quaternion_from_euler(ai, aj, ak):
q[3] = cj * cc + sj * ss
return q
-def euler_from_quaternion(x, y, z, w):
- t0 = +2.0 * (w * x + y * z)
- t1 = +1.0 - 2.0 * (x * x + y * y)
- roll_x = np.arctan2(t0, t1)
- t2 = +2.0 * (w * y - z * x)
- t2 = +1.0 if t2 > +1.0 else t2
- t2 = -1.0 if t2 < -1.0 else t2
- pitch_y = np.arcsin(t2)
+def yaw_from_quaternion(x, y, z, w):
t3 = +2.0 * (w * z + x * y)
t4 = +1.0 - 2.0 * (y * y + z * z)
yaw_z = np.arctan2(t3, t4)