Skip to content

Commit fa926e9

Browse files
Add worker thread to ImageBuffer (#58)
2 parents 7fb6691 + 0293a14 commit fa926e9

File tree

3 files changed

+195
-83
lines changed

3 files changed

+195
-83
lines changed

ed_sensor_integration/include/ed/kinect/image_buffer.h

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
#ifndef ED_SENSOR_INTEGRATION_IMAGE_BUFFER_H_
22
#define ED_SENSOR_INTEGRATION_IMAGE_BUFFER_H_
33

4-
#include <queue>
54
#include <ros/callback_queue.h>
65
#include <rgbd/types.h>
76
#include <geolib/datatypes.h>
87

8+
#include <forward_list>
9+
#include <memory>
10+
#include <mutex>
11+
#include <thread>
12+
913
namespace rgbd
1014
{
1115
class Client;
@@ -16,6 +20,11 @@ namespace tf
1620
class TransformListener;
1721
}
1822

23+
/**
24+
* @brief The ImageBuffer class provides a buffer to synchronise rgbd images with sensor positions.
25+
* Images are stored until they are removed from the buffer or until a
26+
* certain amount of time has passed
27+
*/
1928
class ImageBuffer
2029
{
2130

@@ -25,26 +34,67 @@ class ImageBuffer
2534

2635
~ImageBuffer();
2736

28-
void initialize(const std::string& topic);
29-
30-
// Polls to see if there is a new image with transform. If not, returns false
31-
bool nextImage(const std::string& root_frame, rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose);
32-
33-
// Blocks until a new image with transform is found. Returns false if no image or tf could be found within 'timeout_sec' seconds
34-
bool waitForRecentImage(const std::string& root_frame, rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose, double timeout_sec);
37+
/**
38+
* @brief Configure the buffer
39+
* @param topic ros topic on which the rgbd images are published
40+
* @param root_frame root frame to calculate sensor pose relative to (default: map)
41+
* @param worker_thread_frequency frequency at which the worker thread updates the most recent image (default: 20)
42+
*/
43+
void initialize(const std::string& topic, const std::string& root_frame="map", const float worker_thread_frequency=20);
44+
45+
/**
46+
* @brief Returns the most recent combination of image and transform, provided it is different from the previous time the function was called.
47+
* @param[out] image rgbd image to write the next image to. Iff a next image is found
48+
* @param[out] sensor_pose will be filled with the sensor pose corresponding to the next image. Iff a next image is found
49+
* @return whether or not a novel image is available
50+
*/
51+
bool nextImage(rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose);
52+
53+
/**
54+
* @brief Blocks until a new image with transform is found. Returns false if no image or TF could be found within 'timeout_sec' seconds.
55+
* But will always give it one try, both to get the image and to get TF.
56+
* @param[out] image rgbd image to write the next image to. Iff a next image is found
57+
* @param[out] sensor_pose will be filled with the sensor pose corresponding to the next image. Iff a next image is found
58+
* @param[in] timeout_sec maximum duration to block.
59+
* @return whether or not a next image was available within the timeout duration.
60+
*/
61+
bool waitForRecentImage(rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose, double timeout_sec);
3562

3663
private:
3764

38-
std::string topic_;
65+
std::string root_frame_;
3966

40-
rgbd::Client* kinect_client_;
67+
std::unique_ptr<rgbd::Client> rgbd_client_;
4168

42-
tf::TransformListener* tf_listener_;
69+
std::unique_ptr<tf::TransformListener> tf_listener_;
4370

44-
std::queue<rgbd::ImageConstPtr> image_buffer_;
71+
/**
72+
* @brief Newer images should be pushed on the front. This will result in the front being the most recent and the back being the oldest
73+
*/
74+
std::forward_list<rgbd::ImageConstPtr> image_buffer_;
4575

4676
ros::CallbackQueue cb_queue_;
4777

78+
std::pair<rgbd::ImageConstPtr, geo::Pose3D> recent_image_;
79+
/**
80+
* @brief For protecting ImageBuffer::recent_image_
81+
*/
82+
std::mutex recent_image_mutex_;
83+
std::unique_ptr<std::thread> worker_thread_ptr_;
84+
bool shutdown_; // Trigger to kill the worker thread
85+
86+
/**
87+
* @brief Iterates over the buffer and tries to get TF for the most recent image. Deletes image and all older images when succesful
88+
* or when image is too old from the buffer
89+
* @return Indicates whether the most recent image has been updated
90+
*/
91+
bool getMostRecentImageTF();
92+
93+
/**
94+
* @brief Calls ImageBuffer::getMostRecentImageTF on the specified frequency
95+
* @param frequency Frequency for checking new images.
96+
*/
97+
void workerThreadFunc(const float frequency=20);
4898

4999
};
50100

ed_sensor_integration/src/kinect/image_buffer.cpp

Lines changed: 130 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,77 +7,85 @@
77

88
// ----------------------------------------------------------------------------------------------------
99

10-
ImageBuffer::ImageBuffer() : kinect_client_(nullptr), tf_listener_(nullptr)
10+
ImageBuffer::ImageBuffer() : rgbd_client_(nullptr), tf_listener_(nullptr), shutdown_(false)
1111
{
1212
}
1313

1414
// ----------------------------------------------------------------------------------------------------
1515

1616
ImageBuffer::~ImageBuffer()
1717
{
18-
if (kinect_client_)
19-
delete kinect_client_;
20-
if (tf_listener_)
21-
delete tf_listener_;
18+
shutdown_ = true;
19+
if (worker_thread_ptr_)
20+
worker_thread_ptr_->join();
2221
}
2322

2423
// ----------------------------------------------------------------------------------------------------
2524

26-
void ImageBuffer::initialize(const std::string& topic)
25+
void ImageBuffer::initialize(const std::string& topic, const std::string& root_frame, const float worker_thread_frequency)
2726
{
28-
if (!kinect_client_)
29-
kinect_client_ = new rgbd::Client;
27+
root_frame_ = root_frame;
3028

31-
kinect_client_->initialize(topic);
29+
if (!rgbd_client_)
30+
rgbd_client_ = std::make_unique<rgbd::Client>();
31+
32+
rgbd_client_->initialize(topic);
3233

3334
if (!tf_listener_)
34-
tf_listener_ = new tf::TransformListener;
35+
tf_listener_ = std::make_unique<tf::TransformListener>();
36+
37+
worker_thread_ptr_ = std::make_unique<std::thread>(&ImageBuffer::workerThreadFunc, this, worker_thread_frequency);
3538
}
3639

3740
// ----------------------------------------------------------------------------------------------------
3841

39-
bool ImageBuffer::waitForRecentImage(const std::string& root_frame, rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose, double timeout_sec)
42+
bool ImageBuffer::waitForRecentImage(rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose, double timeout_sec)
4043
{
41-
if (!kinect_client_)
44+
if (!rgbd_client_)
45+
{
46+
ROS_ERROR_NAMED("image_buffer", "[IMAGE_BUFFER] No RGBD client");
4247
return false;
48+
}
4349

4450
// - - - - - - - - - - - - - - - - - -
4551
// Wait until we get a new image
4652

4753
ros::Time t_start = ros::Time::now();
54+
ros::Time t_end = t_start + ros::Duration(timeout_sec);
4855

4956
rgbd::ImageConstPtr rgbd_image;
50-
while(ros::ok())
57+
do
5158
{
52-
if (ros::Time::now() - t_start > ros::Duration(timeout_sec))
53-
return false;
54-
55-
rgbd_image = kinect_client_->nextImage();
59+
rgbd_image = rgbd_client_->nextImage();
5660

5761
if (rgbd_image)
5862
break;
63+
else if (ros::Time::now() > t_end)
64+
return false;
5965
else
6066
ros::Duration(0.1).sleep();
6167
}
68+
while(ros::ok()); // Give it minimal one go
69+
6270

6371
// - - - - - - - - - - - - - - - - - -
6472
// Wait until we have a tf
65-
66-
if (!tf_listener_->waitForTransform(root_frame, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()), ros::Duration(timeout_sec)))
67-
return false;
73+
if (!tf_listener_->canTransform(root_frame_, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()))) // Get the TF when it is available now
74+
if (!tf_listener_->waitForTransform(root_frame_, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()), t_end - ros::Time::now()))
75+
return false;
6876

6977
// - - - - - - - - - - - - - - - - - -
7078
// Calculate tf
7179

7280
try
7381
{
7482
tf::StampedTransform t_sensor_pose;
75-
tf_listener_->lookupTransform(root_frame, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()), t_sensor_pose);
83+
tf_listener_->lookupTransform(root_frame_, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()), t_sensor_pose);
7684
geo::convert(t_sensor_pose, sensor_pose);
7785
}
7886
catch(tf::TransformException& ex)
7987
{
80-
ROS_ERROR("[IMAGE_BUFFER] Could not get sensor pose: %s", ex.what());
88+
ROS_ERROR_NAMED("image_buffer", "[IMAGE_BUFFER] Could not get sensor pose: %s", ex.what());
8189
return false;
8290
}
8391

@@ -91,75 +99,129 @@ bool ImageBuffer::waitForRecentImage(const std::string& root_frame, rgbd::ImageC
9199

92100
// ----------------------------------------------------------------------------------------------------
93101

94-
bool ImageBuffer::nextImage(const std::string& root_frame, rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose)
102+
bool ImageBuffer::nextImage(rgbd::ImageConstPtr& image, geo::Pose3D& sensor_pose)
95103
{
96-
if (!kinect_client_)
104+
std::lock_guard<std::mutex> lg(recent_image_mutex_);
105+
if(!recent_image_.first)
106+
{
107+
ROS_DEBUG("[IMAGE_BUFFER] No new image");
97108
return false;
109+
}
98110

99-
// - - - - - - - - - - - - - - - - - -
100-
// Fetch kinect image and place in image buffer
111+
image = recent_image_.first;
112+
sensor_pose = recent_image_.second;
101113

102-
rgbd::ImageConstPtr rgbd_image = kinect_client_->nextImage();
103-
if (rgbd_image)
104-
image_buffer_.push(rgbd_image);
114+
recent_image_.first.reset(); // Invalidate the most recent image
105115

106-
if (image_buffer_.empty())
107-
return false;
116+
return true;
117+
}
118+
119+
// ----------------------------------------------------------------------------------------------------
108120

109-
rgbd_image = image_buffer_.front();
121+
bool ImageBuffer::getMostRecentImageTF()
122+
{
123+
if (!rgbd_client_)
124+
{
125+
ROS_ERROR("[IMAGE_BUFFER] No RGBD client");
126+
return false;
127+
}
110128

111129
// - - - - - - - - - - - - - - - - - -
112-
// Determine absolute kinect pose based on TF
130+
// Fetch kinect image and place in image buffer
113131

114-
try
115132
{
116-
tf::StampedTransform t_sensor_pose;
117-
tf_listener_->lookupTransform(root_frame, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()), t_sensor_pose);
118-
geo::convert(t_sensor_pose, sensor_pose);
119-
image_buffer_.pop();
120-
133+
rgbd::ImageConstPtr new_image = rgbd_client_->nextImage();
134+
if (new_image)
135+
{
136+
image_buffer_.push_front(new_image);
137+
ROS_DEBUG_STREAM_NAMED("image_buffer", "[IMAGE_BUFFER] New image from the RGBD client with timestamp: " << new_image->getTimestamp());
138+
}
139+
else
140+
{
141+
ROS_DEBUG_NAMED("image_buffer", "[IMAGE_BUFFER] No new image from the RGBD client");
142+
}
121143
}
122-
catch(tf::ExtrapolationException& ex)
144+
145+
geo::Pose3D sensor_pose;
146+
147+
for (std::forward_list<rgbd::ImageConstPtr>::iterator it = image_buffer_.begin(); it != image_buffer_.end(); ++it)
123148
{
149+
rgbd::ImageConstPtr& rgbd_image = *it;
124150
try
125151
{
126-
// Now we have to check if the error was an interpolation or extrapolation error (i.e., the image is too old or
127-
// to new, respectively). If it is too old, discard it.
128-
129-
tf::StampedTransform latest_sensor_pose;
130-
tf_listener_->lookupTransform(root_frame, rgbd_image->getFrameId(), ros::Time(0), latest_sensor_pose);
131-
// If image time stamp is older than latest transform, throw it out
132-
if ( latest_sensor_pose.stamp_ > ros::Time(rgbd_image->getTimestamp()) )
152+
tf::StampedTransform t_sensor_pose;
153+
tf_listener_->lookupTransform(root_frame_, rgbd_image->getFrameId(), ros::Time(rgbd_image->getTimestamp()), t_sensor_pose);
154+
geo::convert(t_sensor_pose, sensor_pose);
155+
}
156+
catch (tf::ExtrapolationException& ex)
157+
{
158+
try
133159
{
134-
image_buffer_.pop();
135-
ROS_ERROR_STREAM_DELAYED_THROTTLE(10, "[IMAGE_BUFFER] Image too old to look-up tf: image timestamp = " << std::fixed
136-
<< ros::Time(rgbd_image->getTimestamp()));
137-
ROS_WARN_STREAM("[IMAGE_BUFFER] Image too old to look-up tf: image timestamp = " << std::fixed
138-
<< ros::Time(rgbd_image->getTimestamp()));
160+
// Now we have to check if the error was an interpolation or extrapolation error (i.e., the image is too old or
161+
// to new, respectively). If it is too old, discard it.
162+
tf::StampedTransform latest_sensor_pose;
163+
tf_listener_->lookupTransform(root_frame_, rgbd_image->getFrameId(), ros::Time(0), latest_sensor_pose);
164+
// If image time stamp is older than latest transform, the image is too old and the tf data is not available anymore
165+
if ( latest_sensor_pose.stamp_ > ros::Time(rgbd_image->getTimestamp()) )
166+
{
167+
168+
ROS_DEBUG_STREAM_NAMED("image_buffer", "[IMAGE_BUFFER] Image too old to look-up tf. Deleting all images older than timestamp: " << std::fixed
169+
<< ros::Time(rgbd_image->getTimestamp()));
170+
// Deleting this image and all older images
171+
image_buffer_.erase_after(it, image_buffer_.end());
172+
return false;
173+
}
174+
else
175+
{
176+
// Image is too new; continue to next image, which is older
177+
ROS_DEBUG_STREAM_DELAYED_THROTTLE_NAMED(10, "image_buffer", "[IMAGE_BUFFER] Image too new to look-up tf: image timestamp: " << std::fixed << ros::Time(rgbd_image->getTimestamp()) << ", what: " << ex.what());
178+
continue;
179+
}
180+
}
181+
catch (tf::TransformException& ex)
182+
{
183+
ROS_ERROR_DELAYED_THROTTLE_NAMED(10, "image_buffer", "[IMAGE_BUFFER] Could not get latest sensor pose (probably because tf is still initializing): %s", ex.what());
184+
ROS_WARN_NAMED("image_buffer", "[IMAGE_BUFFER] Could not get latest sensor pose (probably because tf is still initializing): %s", ex.what());
185+
continue;
139186
}
140-
141-
return false;
142187
}
143-
catch(tf::TransformException& exc)
188+
catch (tf::TransformException& ex)
144189
{
145-
ROS_ERROR_DELAYED_THROTTLE(10, "[IMAGE_BUFFER] Could not get latest sensor pose (probably because tf is still initializing): %s", ex.what());
146-
ROS_WARN("[IMAGE_BUFFER] Could not get latest sensor pose (probably because tf is still initializing): %s", ex.what());
147-
return false;
190+
ROS_ERROR_DELAYED_THROTTLE_NAMED(10, "image_buffer", "[IMAGE_BUFFER] Could not get sensor pose: %s", ex.what());
191+
ROS_WARN_NAMED("image_buffer", "[IMAGE_BUFFER] Could not get sensor pose: %s", ex.what());
192+
continue;
148193
}
149-
}
150-
catch(tf::TransformException& ex)
151-
{
152-
ROS_ERROR_DELAYED_THROTTLE(10, "[IMAGE_BUFFER] Could not get sensor pose: %s", ex.what());
153-
ROS_WARN("[IMAGE_BUFFER] Could not get sensor pose: %s", ex.what());
154-
return false;
194+
195+
// Convert from ROS coordinate frame to geolib coordinate frame
196+
sensor_pose.R = sensor_pose.R * geo::Matrix3(1, 0, 0, 0, -1, 0, 0, 0, -1);
197+
198+
{
199+
std::lock_guard<std::mutex> lg(recent_image_mutex_);
200+
recent_image_.first = rgbd_image;
201+
recent_image_.second = sensor_pose;
202+
}
203+
204+
// Deleting this image and all older images
205+
image_buffer_.erase_after(it, image_buffer_.end());
206+
207+
return true;
155208
}
156209

157-
// Convert from ROS coordinate frame to geolib coordinate frame
158-
sensor_pose.R = sensor_pose.R * geo::Matrix3(1, 0, 0, 0, -1, 0, 0, 0, -1);
210+
// Not been able to update the most recent image/TF combo
211+
return false;
212+
}
159213

160-
image = rgbd_image;
214+
// ----------------------------------------------------------------------------------------------------
161215

162-
return true;
216+
void ImageBuffer::workerThreadFunc(const float frequency)
217+
{
218+
ros::Rate r(frequency);
219+
while(!shutdown_)
220+
{
221+
if (!getMostRecentImageTF())
222+
ROS_ERROR_DELAYED_THROTTLE_NAMED(5, "image_buffer", "[IMAGE_BUFFER] Could not get a pose for any image in the buffer");
223+
r.sleep();
224+
}
163225
}
164226

165227
// ----------------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)