Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <shared_mutex>

#include <algorithm>
#include <iterator>
#include <memory>
#include <stdexcept>
Expand Down Expand Up @@ -386,6 +387,39 @@ class IntraProcessManager
std::vector<uint64_t> take_ownership_subscriptions;
};

/// Hash function for rmw_gid_t to enable use in unordered_map
struct rmw_gid_hash
{
std::size_t operator()(const rmw_gid_t & gid) const noexcept
{
// Using the FNV-1a hash algorithm on the gid data
constexpr std::size_t FNV_prime = 1099511628211u;
std::size_t result = 14695981039346656037u;

for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
result ^= gid.data[i];
result *= FNV_prime;
}
return result;
}
};

/// Equality comparison for rmw_gid_t to enable use in unordered_map
struct rmw_gid_equal
{
bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept
{
// Compare the data bytes only.
// implementation_identifier pointer comparison is not used here because
// intra-process communication is always within the same process and RMW,
// and pointer comparison is fragile across dynamically loaded components.
Comment thread
fujitatomoya marked this conversation as resolved.
return std::equal(
std::begin(lhs.data),
std::end(lhs.data),
std::begin(rhs.data));
}
};
Comment thread
fujitatomoya marked this conversation as resolved.

using SubscriptionMap =
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;

Expand All @@ -398,6 +432,16 @@ class IntraProcessManager
using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;

/// Structure to store publisher information in GID lookup map
struct PublisherInfo
{
uint64_t pub_id;
rclcpp::PublisherBase::WeakPtr publisher;
};

using GidToPublisherInfoMap =
std::unordered_map<rmw_gid_t, PublisherInfo, rmw_gid_hash, rmw_gid_equal>;

RCLCPP_PUBLIC
static
uint64_t
Expand Down Expand Up @@ -642,6 +686,8 @@ class IntraProcessManager
PublisherBufferMap publisher_buffers_;

mutable std::shared_timed_mutex mutex_;

GidToPublisherInfoMap gid_to_publisher_info_;
Comment thread
fujitatomoya marked this conversation as resolved.
};

} // namespace experimental
Expand Down
38 changes: 29 additions & 9 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ IntraProcessManager::add_publisher(
}
}

// Add GID to publisher info mapping for fast lookups (stores both ID and weak_ptr)
gid_to_publisher_info_[publisher->get_gid()] = {pub_id, publisher};

// Initialize the subscriptions storage for this publisher.
pub_to_subs_[pub_id] = SplittedSubscriptions();

Expand Down Expand Up @@ -98,6 +101,24 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

// Remove GID to publisher info mapping.
// First try via the publisher's own GID (fast path).
auto pub_it = publishers_.find(intra_process_publisher_id);
if (pub_it != publishers_.end()) {
auto publisher = pub_it->second.lock();
if (publisher) {
gid_to_publisher_info_.erase(publisher->get_gid());
} else {
// Publisher weak_ptr already expired, fall back to linear scan by pub_id.
for (auto git = gid_to_publisher_info_.begin(); git != gid_to_publisher_info_.end(); ++git) {
if (git->second.pub_id == intra_process_publisher_id) {
gid_to_publisher_info_.erase(git);
break;
Comment thread
fujitatomoya marked this conversation as resolved.
}
}
Comment thread
fujitatomoya marked this conversation as resolved.
}
Comment thread
fujitatomoya marked this conversation as resolved.
}

publishers_.erase(intra_process_publisher_id);
publisher_buffers_.erase(intra_process_publisher_id);
pub_to_subs_.erase(intra_process_publisher_id);
Expand All @@ -108,16 +129,15 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
// Single O(1) hash map lookup - struct contains both ID and weak_ptr
auto it = gid_to_publisher_info_.find(*id);
if (it == gid_to_publisher_info_.end()) {
return false;
}
return false;

// Verify the publisher still exists by checking the weak_ptr
auto publisher = it->second.publisher.lock();
return publisher != nullptr;
}

size_t
Expand Down
16 changes: 15 additions & 1 deletion rclcpp/test/rclcpp/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,14 @@ class PublisherBase
explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos)
: topic_name(topic),
qos_profile(qos)
{}
{
// Initialize a mock GID with unique data based on this pointer
gid_.implementation_identifier = "mock_rmw";
auto ptr_value = reinterpret_cast<std::uintptr_t>(this);
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
Comment thread
fujitatomoya marked this conversation as resolved.
gid_.data[i] = static_cast<uint8_t>((ptr_value >> (i * 8)) & 0xFF);
}
}

virtual ~PublisherBase()
{}
Expand Down Expand Up @@ -192,6 +199,12 @@ class PublisherBase
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
}

const rmw_gid_t &
get_gid() const
{
return gid_;
}

bool
operator==([[maybe_unused]] const rmw_gid_t & gid) const
{
Expand All @@ -210,6 +223,7 @@ class PublisherBase
private:
std::string topic_name;
rclcpp::QoS qos_profile;
rmw_gid_t gid_;
};

template<typename T, typename Alloc = std::allocator<void>>
Expand Down