|
| 1 | +#pragma once |
| 2 | + |
| 3 | +// clang-format off |
| 4 | +/* === MODULE MANIFEST === |
| 5 | +module_name: SharedTopicClient |
| 6 | +module_description: No description provided |
| 7 | +constructor_args: |
| 8 | + - uart_name: "uart_cdc" |
| 9 | + - task_stack_depth: 512 |
| 10 | + - buffer_size: 256 |
| 11 | + - topic_names: |
| 12 | + - "topic1" |
| 13 | + - "topic2" |
| 14 | +required_hardware: uart_name |
| 15 | +=== END MANIFEST === */ |
| 16 | +// clang-format on |
| 17 | + |
| 18 | +#include "app_framework.hpp" |
| 19 | +#include "uart.hpp" |
| 20 | + |
| 21 | +class SharedTopicClient : public LibXR::Application { |
| 22 | + public: |
| 23 | + typedef struct { |
| 24 | + SharedTopicClient *client; |
| 25 | + uint32_t topic_crc32; |
| 26 | + uint32_t index; |
| 27 | + } CallbackInfo; |
| 28 | + |
| 29 | + SharedTopicClient(LibXR::HardwareContainer &hw, |
| 30 | + LibXR::ApplicationManager &app, const char *uart_name, |
| 31 | + uint32_t task_stack_depth, uint32_t buffer_size, |
| 32 | + std::initializer_list<const char *> topic_names) |
| 33 | + : uart_(hw.template Find<LibXR::UART>(uart_name)), |
| 34 | + tx_buffer_(new uint8_t[buffer_size], buffer_size), |
| 35 | + tx_queue_(buffer_size) { |
| 36 | + ASSERT(uart_ != nullptr); |
| 37 | + |
| 38 | + topics_pack_buffer_ = new LibXR::RawData[topic_names.size()]; |
| 39 | + |
| 40 | + uint32_t i = 0; |
| 41 | + |
| 42 | + for (auto name : topic_names) { |
| 43 | + auto ans = LibXR::Topic::Find(name); |
| 44 | + if (ans == nullptr) { |
| 45 | + XR_LOG_ERROR("Topic not found: %s", name); |
| 46 | + ASSERT(false); |
| 47 | + } |
| 48 | + topics_pack_buffer_[i] = LibXR::RawData( |
| 49 | + new uint8_t[ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE], |
| 50 | + ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE); |
| 51 | + |
| 52 | + void (*func)(bool, CallbackInfo, LibXR::RawData &) = |
| 53 | + [](bool in_isr, CallbackInfo info, LibXR::RawData &data) { |
| 54 | + LibXR::WriteOperation op; |
| 55 | + LibXR::Topic::PackData(info.topic_crc32, |
| 56 | + info.client->topics_pack_buffer_[info.index], |
| 57 | + data); |
| 58 | + info.client->tx_queue_.PushBatch( |
| 59 | + static_cast<uint8_t *>( |
| 60 | + info.client->topics_pack_buffer_[info.index].addr_), |
| 61 | + info.client->topics_pack_buffer_[info.index].size_); |
| 62 | + info.client->tx_sem_.PostFromCallback(in_isr); |
| 63 | + }; |
| 64 | + |
| 65 | + auto msg_cb = LibXR::Callback<LibXR::RawData &>::Create( |
| 66 | + func, CallbackInfo{this, ans->data_.crc32, i}); |
| 67 | + |
| 68 | + LibXR::Topic topic(ans); |
| 69 | + |
| 70 | + topic.RegisterCallback(msg_cb); |
| 71 | + |
| 72 | + i++; |
| 73 | + } |
| 74 | + |
| 75 | + tx_thread_.Create(this, TxThreadFun, "SharedTopicClientTxThread", |
| 76 | + task_stack_depth, LibXR::Thread::Priority::MEDIUM); |
| 77 | + |
| 78 | + app.Register(*this); |
| 79 | + } |
| 80 | + |
| 81 | + static void TxThreadFun(SharedTopicClient *client) { |
| 82 | + LibXR::Semaphore write_op_sem; |
| 83 | + LibXR::WriteOperation op(write_op_sem); |
| 84 | + LibXR::WriteOperation op_none; |
| 85 | + while (true) { |
| 86 | + client->tx_sem_.Wait(); |
| 87 | + auto size = |
| 88 | + LibXR::min(client->tx_queue_.Size(), client->tx_buffer_.size_); |
| 89 | + if (size > 0 && client->tx_queue_.PopBatch( |
| 90 | + static_cast<uint8_t *>(client->tx_buffer_.addr_), |
| 91 | + size) == ErrorCode::OK) { |
| 92 | + client->uart_->write_port_( |
| 93 | + {static_cast<uint8_t *>(client->tx_buffer_.addr_), size}, op_none); |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + void OnMonitor() override {} |
| 98 | + |
| 99 | + private: |
| 100 | + LibXR::UART *uart_; |
| 101 | + LibXR::RawData tx_buffer_; |
| 102 | + LibXR::LockFreeQueue<uint8_t> tx_queue_; |
| 103 | + LibXR::RawData *topics_pack_buffer_; |
| 104 | + LibXR::Semaphore tx_sem_; |
| 105 | + LibXR::Thread tx_thread_; |
| 106 | +}; |
0 commit comments