Add support for rosidl::Buffer-aware per-endpoint pub/sub#867
Add support for rosidl::Buffer-aware per-endpoint pub/sub#867
Conversation
Signed-off-by: CY Chen <cyc@nvidia.com>
hidmic
left a comment
There was a problem hiding this comment.
First pass. Another PR that needs many 👀 .
| bool gid_equal(const rmw_gid_t & a, const rmw_gid_t & b) | ||
| { | ||
| return std::memcmp(a.data, b.data, RMW_GID_STORAGE_SIZE) == 0; | ||
| } |
There was a problem hiding this comment.
@nvcyc there's rmw_compare_gids_equal() already
|
|
||
| for (const auto & endpoint : info->buffer_endpoints_) { | ||
| uint32_t serialized_size = callbacks->get_serialized_size(ros_message); | ||
| size_t buffer_size = serialized_size + 4096; |
There was a problem hiding this comment.
@nvcyc mind to explain those extra 4KB?
| auto gid_to_hex = [](const rmw_gid_t & gid, size_t bytes = 8) -> std::string { | ||
| static const char hex_chars[] = "0123456789abcdef"; | ||
| std::string result; | ||
| result.reserve(bytes * 2); | ||
| for (size_t i = 0; i < bytes && i < RMW_GID_STORAGE_SIZE; ++i) { | ||
| result += hex_chars[(gid.data[i] >> 4) & 0xF]; | ||
| result += hex_chars[gid.data[i] & 0xF]; | ||
| } | ||
| return result; | ||
| }; |
There was a problem hiding this comment.
@nvcyc nit: considering making this a free function.
| sub_info.gid.data, RMW_GID_STORAGE_SIZE); | ||
|
|
||
| eprosima::fastdds::dds::TopicQos topic_qos = | ||
| info->participant_->get_default_topic_qos(); |
There was a problem hiding this comment.
@nvcyc meta: we should notify the user somehow about the actual QoS profile used for buffer aware pub/sub, or fallback to CPU only, or something.
| const_cast<void **>(data_values.buffer())[0] = &data; | ||
| eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; | ||
|
|
||
| auto ret_code = endpoint->data_reader->take(data_values, info_seq, 1); |
There was a problem hiding this comment.
@nvcyc meta: how do we enforce that we don't starve some endpoints? This implicitly prioritizes endpoints that were discovered earlier than others.
| if (subscription_options->acceptable_buffer_backends && | ||
| strlen(subscription_options->acceptable_buffer_backends) > 0) | ||
| { | ||
| std::string acceptable(subscription_options->acceptable_buffer_backends); |
There was a problem hiding this comment.
@nvcyc there's rcpputils::split() for this.
| if (backends.empty()) { | ||
| return {}; | ||
| } | ||
| std::string result = BUFFER_BACKEND_SENTINEL; |
| "rmw_fastrtps_cpp", | ||
| "BufferEndpointRegistry: firing %zu publisher callback(s) for discovered subscriber on '%s'", | ||
| callbacks.size(), info.topic_name.c_str()); | ||
| for (const auto & cb : callbacks) { |
There was a problem hiding this comment.
This looks safe from deadlock risk.
| "rmw_fastrtps_cpp", | ||
| "BufferEndpointRegistry: firing %zu subscriber callback(s) for discovered publisher on '%s'", | ||
| callbacks.size(), info.topic_name.c_str()); | ||
| for (const auto & cb : callbacks) { |
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
hidmic
left a comment
There was a problem hiding this comment.
Looks like things are in flight. @MiguelCompany @richiware we need your 👀 here.
| bool gid_equal(const rmw_gid_t & a, const rmw_gid_t & b) | ||
| { | ||
| return std::memcmp(a.data, b.data, RMW_GID_STORAGE_SIZE) == 0; | ||
| } |
|
|
||
| for (const auto & endpoint : info->buffer_endpoints_) { | ||
| uint32_t serialized_size = callbacks->get_serialized_size(ros_message); | ||
| size_t buffer_size = serialized_size + 4096; |
Signed-off-by: CY Chen <cyc@nvidia.com>
| return; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Should call rosidl_buffer_backend_registry::BufferBackendRegistry::get_instance().call notify_endpoint_discovered() around here.
| return; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Should call rosidl_buffer_backend_registry::BufferBackendRegistry::get_instance().call notify_endpoint_discovered() around here.
| size_t total_matched = info->publisher_event_->subscription_count(); | ||
| size_t buffer_count = state.endpoints.size() + state.pending.size(); | ||
| bool has_non_buffer_subscribers = (total_matched > buffer_count); |
There was a problem hiding this comment.
Return directly if has_non_buffer_subscribers is non-zero as a fallback when legacy RMWs appear in any peer nodes. That is, every subscriber will get CPU-based messages from their main data reader if there is one or more subscriber that is based on legacy (older version) RMWs that do not support buffer backend protocol (and only work with main data reader/writer).
Description
This pull request adds full
rosidl::Buffersupport tormw_fastrtps_cpp, enabling per-endpoint DDS DataWriters/DataReaders for zero-copy buffer transport between compatible backends. Backend discovery uses DDSuser_dataQoS to advertise and detect supported backends.This pull request modifies the following packages:
rmw_fastrtps_shared_cpp-- shared types and QoScustom_participant_info.hpp: AddedBufferDiscoveryCallbacktype andbuffer_discovery_cb_field for participant-level buffer discovery.custom_publisher_info.hpp: AddedBufferPublisherEndpointstruct and buffer-related fields (is_buffer_aware_,backend_aux_info_,local_endpoint_info_,buffer_mutex_,buffer_endpoints_,pending_buffer_endpoints_,buffer_alive_flag_,participant_,dds_publisher_).custom_subscriber_info.hpp: AddedBufferSubscriptionEndpointstruct (withowns_topicflag for single-process topic reuse) and buffer-related fields. AddedRMWSubscriptionEvent::notify_buffer_data_available().qos.hpp/qos.cpp: Added optionalbuffer_backendsparameter to QoS functions; addedencode_buffer_backends_for_user_data()/parse_buffer_backends_from_user_data()(format:BUFBE:name=aux;name2=aux2).rmw_wait.cpp: Extended WaitSet to also checkbuffer_data_guard_; wakes subscription when buffer data is available.rmw_fastrtps_cpp-- buffer-aware per-endpoint pub/subinitialize_buffer_backends()/shutdown_buffer_backends()duringrmw_init/rmw_shutdown.buffer_endpoint_registry: Per-process singleton mapping topic names to discovery callbacks; publishers register subscriber-discovery callbacks and vice versa. Maintains known endpoints so late-registering callbacks see already-discovered peers.callbacks->has_buffer_fields; encodes backend names in DDSuser_datawith"cpu"always present. Registers subscriber-discovery callbacks that create per-subscriber DataWriters on unique topics ({topic}/_buf/{pub_gid}_{sub_gid}) with reliable QoS, synchronous publish mode, and data sharing off. Discovery callbacks usebuffer_alive_flag_to prevent use-after-free.acceptable_buffer_backends(NULL/empty/"cpu": CPU-only;"any": all; specific names: filtered). Creates per-publisher DataReaders withBufferDataReaderListenerthat triggersbuffer_data_guard_. Topic reuse: In single-process scenarios, useslookup_topicdescription()to reuse existing topics (owns_topic = false).rmw_publish.cpp): Iterates per-subscriber endpoints, callscdr_serialize_with_endpoint(). Conditionally publishes to the main DataWriter only when non-buffer-aware subscribers exist, avoiding unnecessary CPU conversion.rmw_take.cpp): Iterates per-publisher DataReaders, takes raw CDR bytes and callscdr_deserialize_with_endpoint(). Falls back to main DataReader when no buffer endpoint data is available. Guard condition re-arm: Re-setsbuffer_data_guard_when remaining data exists.rmw_zenoh_cpp: (1) acquirebuffer_mutex_to check duplicates and mark pending; (2) call DDS create/delete APIs without holding the lock; (3) re-acquire to store endpoints. Destroy functions move endpoints to local lists under lock and clean up DDS resources after releasing.Is this user-facing behavior change?
This pull request does not change existing
rmw_fastrtpsbehavior for standard messages. For messages withuint8[]fields, the per-endpoint transport is transparent -- publishers and subscribers share backend info via DDSuser_data, and CPU fallback ensures correctness when backends are incompatible.Did you use Generative AI?
Yes. Claude (claude-4.6-opus) via Cursor was used to assist with creating an initial prototype version of the changes contained in this PR.
Additional Information
This PR is part of the broader ROS 2 native buffer feature introduced in this post.