Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions data_tamer_cpp/include/data_tamer/sinks/ros2_publisher_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,79 @@
#include "data_tamer_msgs/msg/schemas.hpp"
#include "data_tamer_msgs/msg/snapshot.hpp"
#include <unordered_map>
#include <type_traits>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_lifecycle/lifecycle_node.hpp>
#include <rclcpp/node_interfaces/node_interfaces.hpp>
#include <rclcpp/node_interfaces/node_topics_interface.hpp>

namespace DataTamer
{

using PublisherNodeInterfaces =
rclcpp::node_interfaces::NodeInterfaces<rclcpp::node_interfaces::NodeTopicsInterface>;

class ROS2PublisherSink : public DataSinkBase
{
public:
ROS2PublisherSink(std::shared_ptr<rclcpp::Node> node, const std::string& topic_prefix);
template <typename NodeT>
ROS2PublisherSink(NodeT&& nodelike, const std::string& topic_prefix)
: node_interface_(normalize_node(nodelike))
{
create_publishers(topic_prefix);
}

void addChannel(const std::string& name, const Schema& schema) override;

ROS2PublisherSink(std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node,
const std::string& topic_prefix);
bool storeSnapshot(const Snapshot& snapshot) override;

private:
template <typename NodeT>
void create_publishers(NodeT& node, const std::string& topic_prefix)
static PublisherNodeInterfaces normalize_node(NodeT&& nodelike)
{
using D = std::decay_t<NodeT>;

// use a friendlier compile error than the one that would otherwise come out
static_assert(
std::is_same_v<D, PublisherNodeInterfaces> ||
std::is_same_v<D, std::shared_ptr<rclcpp::Node>> ||
std::is_same_v<D, std::shared_ptr<rclcpp_lifecycle::LifecycleNode>> ||
std::is_constructible_v<PublisherNodeInterfaces, D&>,
"ROS2PublisherSink: unsupported node-like type passed to "
"`ROS2PublisherSink(NodeT&& nodelike, const std::string& topic_prefix)`. Pass a "
"rclcpp::Node, "
"rclcpp_lifecycle::LifecycleNode, a shared_ptr to either, or a "
"PublisherNodeInterfaces.");

if constexpr(std::is_same_v<D, PublisherNodeInterfaces>)
{
return nodelike;
}
else if constexpr(std::is_same_v<D, std::shared_ptr<rclcpp::Node>> ||
std::is_same_v<D, std::shared_ptr<rclcpp_lifecycle::LifecycleNode>>)
{
return PublisherNodeInterfaces(*nodelike);
}
else
{
return PublisherNodeInterfaces(nodelike);
}
}

void create_publishers(const std::string& topic_prefix)
{
rclcpp::QoS schemas_qos{ rclcpp::KeepAll() };
schemas_qos.reliable();
schemas_qos.transient_local(); // latch

const rclcpp::QoS data_qos{ rclcpp::KeepAll() };

schema_publisher_ = node->template create_publisher<data_tamer_msgs::msg::Schemas>(
topic_prefix + "/schemas", schemas_qos);
data_publisher_ = node->template create_publisher<data_tamer_msgs::msg::Snapshot>(
topic_prefix + "/data", data_qos);
schema_publisher_ = rclcpp::create_publisher<data_tamer_msgs::msg::Schemas>(
node_interface_, topic_prefix + "/schemas", schemas_qos);
data_publisher_ = rclcpp::create_publisher<data_tamer_msgs::msg::Snapshot>(
node_interface_, topic_prefix + "/data", data_qos);
}

void addChannel(const std::string& name, const Schema& schema) override;

bool storeSnapshot(const Snapshot& snapshot) override;

private:
std::unordered_map<std::string, Schema> schemas_;
Mutex schema_mutex_;

Expand All @@ -47,6 +86,9 @@ class ROS2PublisherSink : public DataSinkBase

bool schema_changed_ = true;
data_tamer_msgs::msg::Snapshot data_msg_;

// ---- Stored node façade ----
PublisherNodeInterfaces node_interface_;
};

} // namespace DataTamer
12 changes: 0 additions & 12 deletions data_tamer_cpp/src/sinks/ros2_publisher_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@

namespace DataTamer
{
ROS2PublisherSink::ROS2PublisherSink(std::shared_ptr<rclcpp::Node> node,
const std::string& topic_prefix)
{
create_publishers(node, topic_prefix);
}

ROS2PublisherSink::ROS2PublisherSink(
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node,
const std::string& topic_prefix)
{
create_publishers(node, topic_prefix);
}

void ROS2PublisherSink::addChannel(const std::string& channel_name, const Schema& schema)
{
Expand Down
1 change: 1 addition & 0 deletions data_tamer_cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ if(gtest_vendor_FOUND AND ament_cmake_gtest_FOUND)
dt_tests.cpp
custom_types_tests.cpp
parser_tests.cpp
ros2_publisher_tests.cpp
trait_tests.cpp)

target_include_directories(datatamer_test
Expand Down
134 changes: 134 additions & 0 deletions data_tamer_cpp/tests/ros2_publisher_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include "data_tamer/data_tamer.hpp"
#include "data_tamer/sinks/ros2_publisher_sink.hpp"

#include <gtest/gtest.h>

#include <string>

using namespace DataTamer;

TEST(DataTamerROS2Publisher, SharedPointer)
{
auto node = std::make_shared<rclcpp::Node>("test_datatamer_shared_pointer");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(node, "test_shared_pointer");

auto channel = ChannelsRegistry::Global().getChannel("channel_shared_pointer");

channel->addDataSink(ros2_sink);

double const value = 1.;
channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

TEST(DataTamerROS2Publisher, SharedPointerLifeCycle)
{
auto lifecycle_node = std::make_shared<rclcpp_lifecycle::LifecycleNode>("test_"
"datatamer_"
"shared_"
"pointer_"
"lifecycle");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(lifecycle_node, "test_shared_"
"pointer_"
"lifecycle");

auto channel = ChannelsRegistry::Global().getChannel("channel_shared_pointer_"
"lifecycle");

channel->addDataSink(ros2_sink);

double const value = 1.;
channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());

lifecycle_node->shutdown();
}

TEST(DataTamerROS2Publisher, Dereference)
{
auto node = std::make_shared<rclcpp::Node>("test_datatamer_dereference");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(*node, "test_dereference");

auto channel = ChannelsRegistry::Global().getChannel("channel_dereference");

channel->addDataSink(ros2_sink);

double const value = 1.;
channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

TEST(DataTamerROS2Publisher, DereferenceLifeCycle)
{
auto lifecycle_node = std::make_shared<rclcpp_lifecycle::LifecycleNode>("test_"
"datatamer_"
"dereference_"
"lifecycle");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(*lifecycle_node, "test_"
"dereference_"
"lifecycle");

auto channel = ChannelsRegistry::Global().getChannel("channel_dereference_lifecycle");

channel->addDataSink(ros2_sink);

double const value = 1.;
channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());

lifecycle_node->shutdown();
}

TEST(DataTamerROS2Publisher, NodeInterfacesDirect)
{
auto node = std::make_shared<rclcpp::Node>("test_datatamer_node_interfaces");
PublisherNodeInterfaces interfaces(*node);
auto ros2_sink = std::make_shared<ROS2PublisherSink>(interfaces, "test_node_"
"interfaces");

auto channel = ChannelsRegistry::Global().getChannel("channel_node_interfaces");

channel->addDataSink(ros2_sink);

double const value = 1.;
channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

TEST(DataTamerROS2Publisher, NodeInterfacesDirectLifeCycle)
{
auto lifecycle_node = std::make_shared<rclcpp_lifecycle::LifecycleNode>("test_"
"datatamer_"
"node_"
"interfaces_"
"lifecycle");
PublisherNodeInterfaces interfaces(*lifecycle_node);
auto ros2_sink = std::make_shared<ROS2PublisherSink>(interfaces, "test_node_interfaces_"
"lifecycle");

auto channel = ChannelsRegistry::Global().getChannel("channel_node_interfaces_"
"lifecycle");

channel->addDataSink(ros2_sink);

double const value = 1.;
channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());

lifecycle_node->shutdown();
}

int main(int argc, char** argv)
{
rclcpp::init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
rclcpp::shutdown();
return ret;
}
Loading