From 091cfb1508db1b21ec6369121e4418412066a3a3 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:12:49 +0200 Subject: [PATCH 01/14] CMake - introduce external clickhouse library --- common/external/CMakeLists.txt | 1 + common/external/clickhouse_cpp.cmake | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 common/external/clickhouse_cpp.cmake diff --git a/common/external/CMakeLists.txt b/common/external/CMakeLists.txt index 81c13d76..9f9ff4ee 100644 --- a/common/external/CMakeLists.txt +++ b/common/external/CMakeLists.txt @@ -8,3 +8,4 @@ include(spdlog.cmake) include(rapidcsv.cmake) include(argparse.cmake) include(xxhash.cmake) +include(clickhouse_cpp.cmake) diff --git a/common/external/clickhouse_cpp.cmake b/common/external/clickhouse_cpp.cmake new file mode 100644 index 00000000..cca3c6ca --- /dev/null +++ b/common/external/clickhouse_cpp.cmake @@ -0,0 +1,18 @@ +# clickhouse-cpp library (C++ client for ClickHouse) +include(FetchContent) + +FetchContent_Declare( + clickhouse_cpp + GIT_REPOSITORY "https://github.com/SiskaPavel/clickhouse-cpp.git" + GIT_TAG "65205a8" + GIT_SHALLOW ON +) + +set(DEBUG_DEPENDENCIES OFF) +set(CLICKHOUSE_INSTALL_TARGETS OFF) + +add_compile_options(-Wno-pedantic -Wno-conversion -Wno-sign-conversion) + +FetchContent_MakeAvailable(clickhouse_cpp) + +add_library(clickhouse_cpp::client ALIAS clickhouse-cpp-lib) From acb883f7330dd3d7bff7eb40f799ff29e1e923fc Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:13:41 +0200 Subject: [PATCH 02/14] CMake - introduce external yaml-cpp library --- common/external/CMakeLists.txt | 1 + common/external/yaml_cpp.cmake | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 common/external/yaml_cpp.cmake diff --git a/common/external/CMakeLists.txt b/common/external/CMakeLists.txt index 9f9ff4ee..d9abcf88 100644 --- a/common/external/CMakeLists.txt +++ b/common/external/CMakeLists.txt @@ -9,3 +9,4 @@ include(rapidcsv.cmake) include(argparse.cmake) include(xxhash.cmake) include(clickhouse_cpp.cmake) +include(yaml_cpp.cmake) diff --git a/common/external/yaml_cpp.cmake b/common/external/yaml_cpp.cmake new file mode 100644 index 00000000..ce7b9f2e --- /dev/null +++ b/common/external/yaml_cpp.cmake @@ -0,0 +1,30 @@ +# Yaml-cpp library +# +# yaml-cpp is a YAML parser and emitter in C++ matching the YAML 1.2 spec. +# +# "yaml-cpp" is exposed to be used as a dependency in other CMake targets +# example usage: target_link_libraries(my_target PRIVATE yaml-cpp) + +include(FetchContent) + +FetchContent_Declare( + yaml-cpp + GIT_REPOSITORY https://github.com/jbeder/yaml-cpp.git + GIT_TAG f732014 # yaml-cpp-0.8.0 +) + +# Make sure that subproject accepts predefined build options without warnings. +set(CMAKE_POLICY_DEFAULT_CMP0077 NEW) + +# Library does not compile with -Werror that we use in some builds +string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ") +string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ") +string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ") +string(REPLACE "-Wconversion " " " CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ") +string(REPLACE "-Wconversion " " " CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ") +string(REPLACE "-Wconversion " " " CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ") +set(YAML_CPP_BUILD_TESTS OFF) +set(YAML_CPP_BUILD_TOOLS OFF) +set(YAML_CPP_INSTALL OFF) + +FetchContent_MakeAvailable(yaml-cpp) From eb3b5468791bd1c573b99f3eb5f71b7d9aa86b6c Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 14:05:33 +0200 Subject: [PATCH 03/14] Clickhouse - module initialization --- modules/CMakeLists.txt | 1 + modules/clickhouse/CMakeLists.txt | 1 + modules/clickhouse/src/CMakeLists.txt | 15 +++++++++++++++ modules/clickhouse/src/main.cpp | 7 +++++++ 4 files changed, 24 insertions(+) create mode 100644 modules/clickhouse/CMakeLists.txt create mode 100644 modules/clickhouse/src/CMakeLists.txt create mode 100644 modules/clickhouse/src/main.cpp diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index df464f25..2d84c1c3 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -2,3 +2,4 @@ add_subdirectory(listDetector) add_subdirectory(sampler) add_subdirectory(telemetry) add_subdirectory(deduplicator) +add_subdirectory(clickhouse) diff --git a/modules/clickhouse/CMakeLists.txt b/modules/clickhouse/CMakeLists.txt new file mode 100644 index 00000000..febd4f0a --- /dev/null +++ b/modules/clickhouse/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(src) diff --git a/modules/clickhouse/src/CMakeLists.txt b/modules/clickhouse/src/CMakeLists.txt new file mode 100644 index 00000000..6fa2150d --- /dev/null +++ b/modules/clickhouse/src/CMakeLists.txt @@ -0,0 +1,15 @@ +add_executable(clickhouse + main.cpp +) + +target_link_libraries(clickhouse PRIVATE + clickhouse_cpp::client + common + unirec::unirec++ + unirec::unirec + trap::trap + argparse + yaml-cpp +) + +install(TARGETS clickhouse DESTINATION ${INSTALL_DIR_BIN}) diff --git a/modules/clickhouse/src/main.cpp b/modules/clickhouse/src/main.cpp new file mode 100644 index 00000000..f27fc4d7 --- /dev/null +++ b/modules/clickhouse/src/main.cpp @@ -0,0 +1,7 @@ +int main(int argc, char** argv) +{ + (void) argc; + (void) argv; + + return 0; +} From 7b79f37fb09b90d9051ba2ccc860b7152a8610ab Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:19:48 +0200 Subject: [PATCH 04/14] Clickhouse - introduce Config Adds config for clickhouse database connection, number of insterter threads, interface data template accepted as input etc. --- modules/clickhouse/src/CMakeLists.txt | 1 + modules/clickhouse/src/config.cpp | 206 ++++++++++++++++++++++++++ modules/clickhouse/src/config.hpp | 123 +++++++++++++++ 3 files changed, 330 insertions(+) create mode 100644 modules/clickhouse/src/config.cpp create mode 100644 modules/clickhouse/src/config.hpp diff --git a/modules/clickhouse/src/CMakeLists.txt b/modules/clickhouse/src/CMakeLists.txt index 6fa2150d..825a13af 100644 --- a/modules/clickhouse/src/CMakeLists.txt +++ b/modules/clickhouse/src/CMakeLists.txt @@ -1,5 +1,6 @@ add_executable(clickhouse main.cpp + config.cpp ) target_link_libraries(clickhouse PRIVATE diff --git a/modules/clickhouse/src/config.cpp b/modules/clickhouse/src/config.cpp new file mode 100644 index 00000000..7ea49d6a --- /dev/null +++ b/modules/clickhouse/src/config.cpp @@ -0,0 +1,206 @@ +/** + * @file config.cpp + * @author Daniel Pelanek + * @brief Parses config xml into config structure. Uses rapidxml. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "config.hpp" +#include "yaml-cpp/yaml.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief Remove leading spaces from string. + * + * @param str + */ +static inline void trimLeft(std::string& str) +{ + str.erase(str.begin(), std::find_if(str.begin(), str.end(), [](unsigned char chr) { + return std::isspace(chr) == 0; + })); +} + +/** + * @brief Remove spaces from string on both sides. + * + * @param str + */ +static inline void trim(std::string& str) +{ + str.erase( + std::find_if( + str.rbegin(), + str.rend(), + [](unsigned char chr) { return std::isspace(chr) == 0; }) + .base(), + str.end()); + + str.erase(str.begin(), std::find_if(str.begin(), str.end(), [](unsigned char chr) { + return std::isspace(chr) == 0; + })); +} + +static Config::Endpoint parseEndpoint(const YAML::Node& node) +{ + Config::Endpoint endpoint; + + if (node["host"]) { + endpoint.host = node["host"].as(); + + if (node["port"]) { + endpoint.port = node["port"].as(); + } + } else { + throw std::runtime_error(std::string("Host parameter missing")); + } + + return endpoint; +} + +static void parseEndpoints(const YAML::Node& node, Config& config) +{ + for (const YAML::Node& endpoint : node) { + config.connection.endpoints.push_back(parseEndpoint(endpoint)); + } +} + +/** + * @brief type from unirec template into local enum. + * + */ +static const std::map g_string_to_columntype + = {{"int8", ColumnType::INT8}, {"int8*", ColumnType::INT8_ARR}, + {"int16", ColumnType::INT16}, {"int16*", ColumnType::INT16_ARR}, + {"int32", ColumnType::INT32}, {"int32*", ColumnType::INT32_ARR}, + {"int64", ColumnType::INT64}, {"int64*", ColumnType::INT64_ARR}, + {"uint8", ColumnType::UINT8}, {"uint8*", ColumnType::UINT8_ARR}, + {"uint16", ColumnType::UINT16}, {"uint16*", ColumnType::UINT16_ARR}, + {"uint32", ColumnType::UINT32}, {"uint32*", ColumnType::UINT32_ARR}, + {"uint64", ColumnType::UINT64}, {"uint64*", ColumnType::UINT64_ARR}, + {"char", ColumnType::CHAR}, {"char*", ColumnType::CHAR_ARR}, + {"float", ColumnType::FLOAT}, {"float*", ColumnType::FLOAT_ARR}, + {"double", ColumnType::DOUBLE}, {"double*", ColumnType::DOUBLE_ARR}, + {"ipaddr", ColumnType::IPADDR}, {"ipaddr*", ColumnType::IPADDR_ARR}, + {"macaddr", ColumnType::MACADDR}, {"macaddr*", ColumnType::MACADDR_ARR}, + {"time", ColumnType::TIME}, {"time*", ColumnType::TIME_ARR}, + {"string", ColumnType::STRING}, {"bytes", ColumnType::BYTES}}; + +static void parseColumns(const YAML::Node& columnsNode, Config& config) +{ + for (const YAML::Node& col : columnsNode) { + auto colValue = col.as(); + // Type/Name can't have space. Trim leading and trailing spaces. + // Leading spaces for name are trimmed below. + trim(colValue); + + Config::Column column; + size_t const spacePos = colValue.find(' '); + + std::string const type = colValue.substr(0, spacePos); + std::string name = colValue.substr(spacePos + 1); + + try { + column.type = g_string_to_columntype.at(type); + + } catch (std::out_of_range& ex) { + std::stringstream sstream; + sstream << "Incorrect column type: " << colValue.substr(0, spacePos); + throw std::runtime_error(sstream.str()); + } + + trimLeft(name); + column.name = name; + + column.fieldID = 0; + + config.columns.push_back(column); + + // Template stored in input interface format. For ensuring format. + config.templateColumnCsv += type; + config.templateColumnCsv += " "; + config.templateColumnCsv += name; + config.templateColumnCsv += ","; + } + + // Trailing comma + config.templateColumnCsv.pop_back(); +} + +static void parseConnection(const YAML::Node& node, Config& config) +{ + parseEndpoints(node["endpoints"], config); + + if (node["username"] && node["password"] && node["database"] && node["table"]) { + config.connection.user = node["username"].as(); + config.connection.password = node["password"].as(); + config.connection.database = node["database"].as(); + config.connection.table = node["table"].as(); + + } else { + throw std::runtime_error(std::string("Argument in connection missing")); + } +} + +static void parseBlocks(const YAML::Node& node, Config& config) +{ + if (node) { + config.blocks = node.as(); + } +} + +static void parseInserterThreads(const YAML::Node& node, Config& config) +{ + if (node) { + config.inserterThreads = node.as(); + } +} + +static void parseBlockInsertThreshold(const YAML::Node& node, Config& config) +{ + if (node) { + config.blockInsertThreshold = node.as(); + } +} + +static void parseBlockInsertMaxDelaySecs(const YAML::Node& node, Config& config) +{ + if (node) { + config.blockInsertMaxDelaySecs = node.as(); + } +} + +static void parseRoot(const YAML::Node& node, Config& config) +{ + parseConnection(node["connection"], config); + parseColumns(node["columns"], config); + + parseBlocks(node["blocks"], config); + parseInserterThreads(node["inserterThreads"], config); + parseBlockInsertThreshold(node["blockInsertThreshold"], config); + parseBlockInsertMaxDelaySecs(node["blockInsertMaxDelaySecs"], config); +} + +Config parseConfig(const std::string& filename) +{ + Config config {}; + + const YAML::Node root = YAML::LoadFile(filename); + + parseRoot(root, config); + + return config; +} diff --git a/modules/clickhouse/src/config.hpp b/modules/clickhouse/src/config.hpp new file mode 100644 index 00000000..02200a47 --- /dev/null +++ b/modules/clickhouse/src/config.hpp @@ -0,0 +1,123 @@ +/** + * @file manager.hpp + * @author Daniel Pelanek + * @brief Declares Config struct and function for parsing it. + * Also declares possible types of column. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include + +/** + * @brief Possible unirec column type. + * + */ +enum ColumnType : uint8_t { + INT8_ARR = 0, + INT16_ARR, + INT32_ARR, + INT64_ARR, + + UINT8_ARR, + UINT16_ARR, + UINT32_ARR, + UINT64_ARR, + + CHAR_ARR, + FLOAT_ARR, + DOUBLE_ARR, + IPADDR_ARR, + MACADDR_ARR, + TIME_ARR, + BYTES, + + MACADDR, + + INT8, + INT16, + INT32, + INT64, + + UINT8, + UINT16, + UINT32, + UINT64, + + CHAR, + FLOAT, + DOUBLE, + + IPADDR, + TIME, + STRING, +}; + +/** + * @class Config + * @brief A struct containing all the configurable parameters + */ +struct Config { + static const uint16_t DEFAULT_PORT = 9000; ///< Default port of clickhouse db + static const uint64_t DEFAULT_INSERTER_THREADS = 32; ///< Default num of inserters + static const uint64_t DEFAULT_BLOCKS = 256; ///< Default num of blocks + static const uint64_t DEFAULT_BLOCK_INSERT_THRESHOLD + = 100000; ///< Default num of columns to trigger insert + static const uint64_t DEFAULT_MAX_BLOCK_INSERT_DELAY = 10; ///< Default max time before insert + + /** + * @brief Data from unirec template about column. + * + */ + struct Column { + std::string name; ///< column name + ColumnType type; ///< column type + ur_field_id_t fieldID; ///< column unirec id + }; + + /** + * @brief Endpoint for clickhouse database instance. + * + */ + struct Endpoint { + std::string host; ///< db hostname + uint16_t port = DEFAULT_PORT; ///< db port + }; + + /** + * @brief Contains database endpoints, information for connecting to + * them and into which table in them to insert data. + * + */ + struct Connection { + std::vector endpoints; ///< Endpoints of databases to send to + std::string user; ///< username for connection + std::string password; ///< password for connection + std::string database; ///< database in instance + std::string table; ///< table name in database + }; + + Connection connection; ///< Clickhouse database connection info + std::vector columns; ///< Columns found in config + std::string templateColumnCsv; ///< For comparing with unirec template when it changes + uint64_t inserterThreads = DEFAULT_INSERTER_THREADS; ///< num of inserters + uint64_t blocks = DEFAULT_BLOCKS; ///< Number of blocks for storing + uint64_t blockInsertThreshold + = DEFAULT_BLOCK_INSERT_THRESHOLD; ///< Higher than triggers insertion + uint64_t blockInsertMaxDelaySecs + = DEFAULT_MAX_BLOCK_INSERT_DELAY; ///< Longer than triggers insertion +}; + +/** + * @brief Parse a XML config into a structured form + * + * @param xml The config as a XML string + * @return The parsed config + */ +Config parseConfig(const std::string& filename); From 5a6ef8eff067c366c811bfb698f149a67a61d962 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:21:37 +0200 Subject: [PATCH 05/14] Clickhouse - introduce Syncqueue needed for communication between manager and inserter in their own threads --- modules/clickhouse/src/syncqueue.hpp | 67 ++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 modules/clickhouse/src/syncqueue.hpp diff --git a/modules/clickhouse/src/syncqueue.hpp b/modules/clickhouse/src/syncqueue.hpp new file mode 100644 index 00000000..a9feddab --- /dev/null +++ b/modules/clickhouse/src/syncqueue.hpp @@ -0,0 +1,67 @@ +/** + * @file + * @author Michal Sedlak + * @brief SyncQueue class implementation + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include + +/** + * @brief A thread-safe queue + */ +template +class SyncQueue { +public: + /** + * @brief Put an item into the queue + * + * @param item The item + */ + void put(Item item) + { + std::lock_guard const lock(m_mutex); + m_items.push(item); + m_size = m_items.size(); + m_avail_cv.notify_all(); + } + + /** + * @brief Get an item from the queue, block and wait if there aren't any + * + * @return The item + */ + Item get() + { + std::unique_lock lock(m_mutex); + while (true) { + if (!m_items.empty()) { + auto item = m_items.front(); + m_items.pop(); + m_size = m_items.size(); + return item; + } + m_avail_cv.wait(lock); + } + } + + /** + * @brief Get the current size of the queue + * + * @return The number of items in the queue + */ + std::size_t size() const { return m_size; } + +private: + std::atomic_size_t m_size = 0; ///< current number of items in the queue + std::queue m_items; ///< underlying container for queued items + std::mutex m_mutex; ///< mutex for synchronizing access + std::condition_variable m_avail_cv; ///< signals availability of items +}; From ace635b24a33af6cff842a30171f2040068b3e0e Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:22:32 +0200 Subject: [PATCH 06/14] Clickhouse - introduce Synstack needed for communication between manager and inserter in their own threads --- modules/clickhouse/src/syncstack.hpp | 67 ++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 modules/clickhouse/src/syncstack.hpp diff --git a/modules/clickhouse/src/syncstack.hpp b/modules/clickhouse/src/syncstack.hpp new file mode 100644 index 00000000..0f1c1552 --- /dev/null +++ b/modules/clickhouse/src/syncstack.hpp @@ -0,0 +1,67 @@ +/** + * @file + * @author Michal Sedlak + * @brief SyncStack class implementation + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include + +/** + * @brief A thread-safe stack + */ +template +class SyncStack { +public: + /** + * @brief Put an item into the queue + * + * @param item The item + */ + void put(Item item) + { + std::lock_guard const lock(m_mutex); + m_items.push(item); + m_size = m_items.size(); + m_avail_cv.notify_all(); + } + + /** + * @brief Get an item from the queue, block and wait if there aren't any + * + * @return The item + */ + Item get() + { + std::unique_lock lock(m_mutex); + while (true) { + if (!m_items.empty()) { + auto item = m_items.top(); + m_items.pop(); + m_size = m_items.size(); + return item; + } + m_avail_cv.wait(lock); + } + } + + /** + * @brief Get the current size of the queue + * + * @return The number of items in the queue + */ + std::size_t size() const { return m_size; } + +private: + std::atomic_size_t m_size = 0; ///< current number of items in the queue + std::stack m_items; ///< underlying container for stacked items + std::mutex m_mutex; ///< mutex for synchronizing access + std::condition_variable m_avail_cv; ///< signals availability of items +}; From e24aebbe01aea55eab77d7f514c2627612b36c2a Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:24:28 +0200 Subject: [PATCH 07/14] Clickhouse - introduce wrapper for ClickHouse client include - Suppresses -Wpedantic and -Wsign-conversion warnings - Wraps with GCC diagnostic pragmas --- modules/clickhouse/src/clickhouse.hpp | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 modules/clickhouse/src/clickhouse.hpp diff --git a/modules/clickhouse/src/clickhouse.hpp b/modules/clickhouse/src/clickhouse.hpp new file mode 100644 index 00000000..26a6f030 --- /dev/null +++ b/modules/clickhouse/src/clickhouse.hpp @@ -0,0 +1,5 @@ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" +#pragma GCC diagnostic ignored "-Wsign-conversion" +#include +#pragma GCC diagnostic pop From 4202a880244d3615d446866600dda99bfa44e878 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:28:38 +0200 Subject: [PATCH 08/14] Clickhouse - introduce DataType used for converting unirec types and data into clickhouse equivalents --- modules/clickhouse/src/CMakeLists.txt | 1 + modules/clickhouse/src/datatype.cpp | 659 ++++++++++++++++++++++++++ modules/clickhouse/src/datatype.hpp | 150 ++++++ 3 files changed, 810 insertions(+) create mode 100644 modules/clickhouse/src/datatype.cpp create mode 100644 modules/clickhouse/src/datatype.hpp diff --git a/modules/clickhouse/src/CMakeLists.txt b/modules/clickhouse/src/CMakeLists.txt index 825a13af..0238c3e2 100644 --- a/modules/clickhouse/src/CMakeLists.txt +++ b/modules/clickhouse/src/CMakeLists.txt @@ -1,6 +1,7 @@ add_executable(clickhouse main.cpp config.cpp + datatype.cpp ) target_link_libraries(clickhouse PRIVATE diff --git a/modules/clickhouse/src/datatype.cpp b/modules/clickhouse/src/datatype.cpp new file mode 100644 index 00000000..7d6db5a9 --- /dev/null +++ b/modules/clickhouse/src/datatype.cpp @@ -0,0 +1,659 @@ +/** + * @file inserter.cpp + * @author Daniel Pelanek + * @brief Defines functions for creating column lambdas for creating, writing + * and parsing unirec data into them. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "datatype.hpp" + +#include + +template +class ColumnDateTime64 : public clickhouse::ColumnDateTime64 { +public: + ColumnDateTime64() + : clickhouse::ColumnDateTime64(Precision) + { + } +}; + +namespace Getters { + +template +static Value getValue(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Value value = record.getFieldAsType(fieldID); + return value; +} + +template +static std::vector getValueArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const arr = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(arr.size()); + std::copy(arr.begin(), arr.end(), std::back_inserter(result)); + return result; +} + +static std::vector getBytes(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const arr = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(arr.size()); + std::transform(arr.begin(), arr.end(), std::back_inserter(result), [](std::byte value) { + return static_cast(value); + }); + return result; +} + +static in6_addr getIp(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::IpAddress addr = record.getFieldAsType(fieldID); + return *((in6_addr*) &addr.ip); +} + +static std::vector getIpArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const addrArr + = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(addrArr.size()); + std::transform( + addrArr.begin(), + addrArr.end(), + std::back_inserter(result), + [](const Nemea::IpAddress& value) -> in6_addr { + return *reinterpret_cast(&value.ip); + }); + return result; +} + +static std::vector getMac(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::MacAddress const mac = record.getFieldAsType(fieldID); + std::vector result; + const int nMacBytes = 6; + result.reserve(nMacBytes); + std::copy(std::begin(mac.mac.bytes), std::end(mac.mac.bytes), std::back_inserter(result)); + return result; +} + +static std::vector> +getMacArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const macArr + = record.getFieldAsUnirecArray(fieldID); + std::vector> result; + result.reserve(macArr.size()); + for (const auto& value : macArr) { + result.emplace_back(); + for (const unsigned char byte : value.mac.bytes) { + result.back().push_back(byte); + } + } + return result; +} + +static uint64_t getTime(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UrTime const time = record.getFieldAsType(fieldID); + const uint64_t nsecInSec = 1000000000; + return ( + (static_cast(ur_time_get_sec(time.time)) * nsecInSec) + + static_cast(ur_time_get_nsec(time.time))); +} + +static std::vector getTimeArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const timeArr + = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(timeArr.size()); + const uint64_t nsecInSec = 1000000000; + std::transform( + timeArr.begin(), + timeArr.end(), + std::back_inserter(result), + [](const Nemea::UrTime& value) -> uint64_t { + return (static_cast(ur_time_get_sec(value.time)) * nsecInSec) + + static_cast(ur_time_get_nsec(value.time)); + }); + return result; +} + +static std::string getString(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + std::string str = record.getFieldAsType(fieldID); + return str; +} +} // namespace Getters + +template +struct DataTypeTraits {}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt8; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt8"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt16; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt16"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt32; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt32"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt64"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt8; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int8"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt16; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int16"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt32; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int32"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int64"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt16)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt32)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt64)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int8)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int16)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int32)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int64)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt8; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt8"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnFloat32; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Float32"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Float32)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnFloat64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Float64"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Float64)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnIPv6; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "IPv6"; + static constexpr auto GETTER = &Getters::getIp; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(IPv6)"; + static constexpr auto GETTER = &Getters::getIpArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getMac; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT>; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Array(UInt8))"; + static constexpr auto GETTER = &Getters::getMacArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = ColumnDateTime64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "DateTime64(9)"; + static constexpr auto GETTER = &Getters::getTime; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT>; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(DateTime64(9))"; + static constexpr auto GETTER = &Getters::getTimeArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnString; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "String"; + static constexpr auto GETTER = &Getters::getString; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getBytes; +}; + +template +static void visit(ColumnType type, Func func) +{ + switch (type) { + case ColumnType::INT8: + func(DataTypeTraits {}); + break; + case ColumnType::INT16: + func(DataTypeTraits {}); + break; + case ColumnType::INT32: + func(DataTypeTraits {}); + break; + case ColumnType::INT64: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME: + func(DataTypeTraits {}); + break; + case ColumnType::STRING: + func(DataTypeTraits {}); + break; + case ColumnType::INT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::BYTES: + func(DataTypeTraits {}); + break; + default: + throw std::runtime_error("invalid data type"); + } +} + +template +static void visitNonArr(ColumnType type, Func func) +{ + switch (type) { + case ColumnType::INT8: + func(DataTypeTraits {}); + break; + case ColumnType::INT16: + func(DataTypeTraits {}); + break; + case ColumnType::INT32: + func(DataTypeTraits {}); + break; + case ColumnType::INT64: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME: + func(DataTypeTraits {}); + break; + case ColumnType::STRING: + func(DataTypeTraits {}); + break; + default: + throw std::runtime_error("invalid data type"); + } +} + +template +static void visitArr(ColumnType type, Func func) +{ + switch (type) { + case ColumnType::INT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::BYTES: + func(DataTypeTraits {}); + break; + default: + throw std::runtime_error("invalid data type"); + } +} + +static bool isArr(ColumnType type) +{ + return type < ColumnType::INT8; +} + +static std::shared_ptr makeArrColumn(ColumnType type) +{ + std::shared_ptr column; + visit(type, [&](auto traits) { + using ColType = typename decltype(traits)::ColumnType; + column = std::make_shared(); + }); + + return column; +} + +static std::shared_ptr makeNonArrColumn(ColumnType type) +{ + std::shared_ptr column; + visit(type, [&](auto traits) { + using ColType = clickhouse::ColumnNullableT; + column = std::make_shared(); + }); + + return column; +} + +std::shared_ptr makeColumn(ColumnType type) +{ + if (isArr(type)) { + return makeArrColumn(type); + } + return makeNonArrColumn(type); +} + +GetterFn makeGetter(ColumnType type) +{ + GetterFn getter; + visit(type, [&](auto traits) { + getter = [](Nemea::UnirecRecordView& record, ur_field_id_t fieldID, ValueVariant& value) { + value = decltype(traits)::GETTER(record, fieldID); + }; + }); + return getter; +} + +static ColumnWriterFn makeArrColumnwriter(ColumnType type) +{ + ColumnWriterFn columnwriter; + + visitArr(type, [&](auto traits) { + columnwriter = [](ValueVariant* value, clickhouse::Column& column) { + using ColumnType = typename decltype(traits)::ColumnType; + using ValueType = std::invoke_result_t< + decltype(decltype(traits)::GETTER), + Nemea::UnirecRecordView&, + ur_field_type_t>; + auto* col = dynamic_cast(&column); + if (value) { + col->Append(std::get(*value)); + } + }; + }); + + return columnwriter; +} + +static ColumnWriterFn makeNonArrColumnwriter(ColumnType type) +{ + ColumnWriterFn columnwriter; + + visitNonArr(type, [&](auto traits) { + columnwriter = [](ValueVariant* value, clickhouse::Column& column) { + using ColumnType = clickhouse::ColumnNullableT; + using ValueType = std::invoke_result_t< + decltype(decltype(traits)::GETTER), + Nemea::UnirecRecordView&, + ur_field_type_t>; + auto* col = dynamic_cast(&column); + if (!value) { + col->Append(std::nullopt); + } else { + col->Append(std::get(*value)); + } + }; + }); + + return columnwriter; +} + +ColumnWriterFn makeColumnwriter(ColumnType type) +{ + if (isArr(type)) { + return makeArrColumnwriter(type); + } + return makeNonArrColumnwriter(type); +} + +std::string typeToClickhouse(ColumnType type) +{ + std::string result; + visit(type, [&](auto traits) { result = traits.CLICKHOUSE_TYPE_NAME; }); + return result; +} diff --git a/modules/clickhouse/src/datatype.hpp b/modules/clickhouse/src/datatype.hpp new file mode 100644 index 00000000..7ec39f71 --- /dev/null +++ b/modules/clickhouse/src/datatype.hpp @@ -0,0 +1,150 @@ +/** + * @file datatype.hpp + * @author Daniel Pelanek + * @brief Functions specific to column data + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "clickhouse.hpp" +#include "config.hpp" + +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief All possible types of parsed values sent into clickhouse. + * + */ +using ValueVariant = std::variant< + int8_t, + std::vector, + int16_t, + std::vector, + int32_t, + std::vector, + int64_t, + std::vector, + uint8_t, + std::vector, + uint16_t, + std::vector, + uint32_t, + std::vector, + uint64_t, + std::vector, + float, + std::vector, + double, + std::vector, + std::vector, + std::vector>, + in6_addr, + std::vector, + std::string>; + +/** + * @brief Lambda for converting unirec column data to clickhouse column. + * + */ +using GetterFn = std::function< + void(Nemea::UnirecRecordView& record, ur_field_id_t fieldID, ValueVariant& value)>; + +/** + * @brief Lambda for writing value into clickhouse column. + * + */ +using ColumnWriterFn = std::function; + +/** + * @brief Lambda for creating clickhouse columns. + * + */ +using ColumnFactoryFn = std::function()>; + +/** + * @brief Column specification. + * + * Contains: + * type, name, unirec field id. + * Helper lambdas for creating, loading data, writing to clickhouse + * value when loaded. + * + */ +struct ColumnCtx { + std::string name; ///< Column name + ColumnType type; ///< Column type + ur_field_id_t fieldID; ///< unirec template field id + + ColumnFactoryFn columnFactory = nullptr; ///< lambda for creating columns + GetterFn getter = nullptr; ///< lambda for converting unirec data to clickhouse column + ColumnWriterFn columnWriter = nullptr; ///< lambda for writinng column value + + bool hasValue = false; ///< If a value was stored to column + ValueVariant valueBuffer; ///< Stored value +}; + +/** + * @brief Sent block through inserter; + * + */ +struct BlockCtx { + /** + * @brief Vector of column data to be inserted into ClickHouse. + */ + std::vector> columns; + + /** + * @brief ClickHouse block structure used for insertion. + */ + clickhouse::Block block; + + /** + * @brief Number of rows in the block. + */ + unsigned int rows; +}; + +/** + * @brief for clickhouse. 9 is nanoseconds. + */ +const int g_TIME_PRECISION = 9; + +/** + * @brief Make a ClickHouse column that is able to store values of the supplied data type + * + * @param type The data type + * @return The ClickHouse column object + */ +std::shared_ptr makeColumn(ColumnType type); + +/** + * @brief Makes a function (lambda) which converts unirec column data into clickhouse column + * + * @param type The data type + * @return The Getter function + */ +GetterFn makeGetter(ColumnType type); + +/** + * @brief Converts Columntype into clickhouse string specification of column + * + * @param type The data type + * @return The ClickHouse column name + */ +ColumnWriterFn makeColumnwriter(ColumnType type); + +/** + * @brief Converts Columntype into clickhouse string specification of column + * + * @param type The data type + * @return The ClickHouse column name + */ +std::string typeToClickhouse(ColumnType type); From 045a03a30dbcfdc87761f85c9210bf63d39a11ae Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:30:43 +0200 Subject: [PATCH 09/14] Clickhouse - introduce Inserter Inserters for inserting blocks into clickhouse database. --- modules/clickhouse/src/CMakeLists.txt | 1 + modules/clickhouse/src/inserter.cpp | 274 ++++++++++++++++++++++++++ modules/clickhouse/src/inserter.hpp | 112 +++++++++++ 3 files changed, 387 insertions(+) create mode 100644 modules/clickhouse/src/inserter.cpp create mode 100644 modules/clickhouse/src/inserter.hpp diff --git a/modules/clickhouse/src/CMakeLists.txt b/modules/clickhouse/src/CMakeLists.txt index 0238c3e2..d79bda10 100644 --- a/modules/clickhouse/src/CMakeLists.txt +++ b/modules/clickhouse/src/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(clickhouse main.cpp config.cpp datatype.cpp + inserter.cpp ) target_link_libraries(clickhouse PRIVATE diff --git a/modules/clickhouse/src/inserter.cpp b/modules/clickhouse/src/inserter.cpp new file mode 100644 index 00000000..3247ef82 --- /dev/null +++ b/modules/clickhouse/src/inserter.cpp @@ -0,0 +1,274 @@ +/** + * @file inserter.cpp + * @author Daniel Pelanek + * @brief Defines inserter methods and helper functions. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "inserter.hpp" + +#include +#include + +static constexpr int g_ERR_TABLE_NOT_FOUND = 60; + +/** + * @brief Clickhouse column description + * + */ +struct ColumnDescription { + std::string name; // clickhouse column name + std::string type; // clickhouse data type as string +}; + +/** + * @brief Extracts all column descriptions from a clickhouse block + * + * @param block + * @return std::vector + */ +static std::vector extractBlockDescription(const clickhouse::Block& block) +{ + std::vector columnDescriptions; + + const std::size_t rowCount = block.GetRowCount(); + + if (block.GetColumnCount() < 2 || rowCount == 0) { + return columnDescriptions; + } + + const uint8_t columnNameIndex = 0; + const uint8_t columnTypeIndex = 1; + + const auto& nameColumns = block[columnNameIndex]->As(); + const auto& typeColumns = block[columnTypeIndex]->As(); + + columnDescriptions.reserve(block.GetRowCount()); + + for (std::size_t rowIndex = 0; rowIndex < rowCount; rowIndex++) { + const ColumnDescription columnDescription { + std::string(nameColumns->At(rowIndex)), + std::string(typeColumns->At(rowIndex))}; + columnDescriptions.emplace_back(columnDescription); + } + + return columnDescriptions; +} + +static std::vector +selectTableDescription(clickhouse::Client& client, const std::string& table) +{ + std::vector columnDescriptions; + const auto selectCallback = [&](const clickhouse::Block& block) { + auto partial = extractBlockDescription(block); + columnDescriptions.insert( + columnDescriptions.end(), + std::make_move_iterator(partial.begin()), + std::make_move_iterator(partial.end())); + }; + + const std::string query = "DESCRIBE TABLE " + table; + + client.Select(query, selectCallback); + + return columnDescriptions; +} + +/** + * @brief Describes table predefined in clickhouse database. + * + * @param client + * @param table name from config + * @return std::vector> + */ +static std::vector +describeTable(clickhouse::Client& client, const std::string& tableName) +{ + std::vector description; + + try { + return selectTableDescription(client, tableName); + + } catch (const clickhouse::ServerException& exc) { + if (exc.GetCode() == g_ERR_TABLE_NOT_FOUND) { + std::stringstream sstream; + sstream << "Table " << tableName << " does not exist."; + throw std::runtime_error(sstream.str()); + } + throw; + } + + return description; +} + +/** + * @brief Compares clickhouse schema to the one defined in config. + * + * @param client + * @param table name from config + * @param columns initialized based on config + */ +static void ensureSchema( + clickhouse::Client& client, + const std::string& table, + const std::vector& columns) +{ + // Load clickhouse columns + auto dbColumns = describeTable(client, table); + + auto schemaHint = [&]() { + std::stringstream sstream; + sstream << "hint:\n"; + sstream << "CREATE TABLE " << table << "(\n"; + size_t columnIndex = 0; + for (const auto& column : columns) { + const auto& clickhouseType = typeToClickhouse(columns[columnIndex].type); + sstream << " \"" << column.name << "\" " << clickhouseType + << (columnIndex < columns.size() - 1 ? "," : "") << '\n'; + columnIndex++; + } + sstream << ");"; + return sstream.str(); + }; + + if (columns.size() != dbColumns.size()) { + std::stringstream sstream; + sstream << "Config has " << columns.size() << " columns but table \"" << table << "\" has " + << dbColumns.size() << "\n" + << schemaHint(); + throw std::runtime_error(sstream.str()); + } + + for (size_t i = 0; i < dbColumns.size(); i++) { + const auto& expectedName = columns[i].name; + const auto& expectedType = typeToClickhouse(columns[i].type); + const auto& [actual_name, actual_type] = dbColumns[i]; + + if (expectedName != actual_name) { + std::stringstream sstream; + sstream << "Expected column #" << i << " in table \"" << table << "\" to be named \"" + << expectedName << "\" but it is \"" << actual_name << "\"\n" + << schemaHint(); + throw std::runtime_error(sstream.str()); + } + + if (expectedType != actual_type) { + std::stringstream sstream; + sstream << "Expected column #" << i << " in table \"" << table << "\" to be of type \"" + << expectedType << "\" but it is \"" << actual_type << "\"\n" + << schemaHint(); + throw std::runtime_error(sstream.str()); + } + } +} + +Inserter::Inserter( + int inserterId, + std::shared_ptr logger, + clickhouse::ClientOptions clientOpts, + const std::vector& columns, + const std::string& table, + SyncQueue& filledBlocks, + SyncStack& emptyBlocks) + + : m_id(inserterId) + , m_logger(std::move(logger)) + , m_client_opts(std::move(clientOpts)) + , m_columns(columns) + , m_table(table) + , m_filled_blocks(filledBlocks) + , m_empty_blocks(emptyBlocks) +{ +} + +void Inserter::start() +{ + m_thread = std::thread([this]() { + try { + run(); + } catch (...) { + m_exception = std::current_exception(); + m_errored = true; + } + }); +} + +void Inserter::insert(clickhouse::Block& block) +{ + bool needsReconnect = false; + while (!m_stop_signal) { + try { + if (needsReconnect) { + m_client->ResetConnectionEndpoint(); + ensureSchema(*m_client, m_table, m_columns); + m_logger->warn( + "[Worker {}}] Connected to {}:{} due to error with previous endpoint", + m_id, + m_client->GetCurrentEndpoint()->host.c_str(), + m_client->GetCurrentEndpoint()->port); + } + + m_client->Insert(m_table, block); + break; + + } catch (const std::exception& ex) { + m_logger->error( + "[Worker {}] Insert failed: {} - retrying in 1 second", + m_id, + ex.what()); + needsReconnect = true; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +void Inserter::run() +{ + m_client = std::make_unique(m_client_opts); + ensureSchema(*m_client, m_table, m_columns); + auto endpoint = m_client->GetCurrentEndpoint(); + if (endpoint) { + m_logger + ->info("[Worker {}] Connected to {}:{}", m_id, endpoint->host.c_str(), endpoint->port); + } else { + m_logger->warn("[Worker {}] Connected, but endpoint is not available.", m_id); + } + + while (!m_stop_signal) { + BlockCtx* block = m_filled_blocks.get(); + if (block == nullptr) { + // we might get null as a way to get unblocked and process stop signal + continue; + } + + block->block.RefreshRowCount(); + insert(block->block); + + for (auto& column : block->columns) { + column->Clear(); + } + + block->rows = 0; + m_empty_blocks.put(block); + } +} + +void Inserter::stop() +{ + m_stop_signal = true; +} + +void Inserter::join() +{ + m_thread.join(); +} + +void Inserter::checkError() +{ + if (m_errored) { + std::rethrow_exception(m_exception); + } +} diff --git a/modules/clickhouse/src/inserter.hpp b/modules/clickhouse/src/inserter.hpp new file mode 100644 index 00000000..a9911b12 --- /dev/null +++ b/modules/clickhouse/src/inserter.hpp @@ -0,0 +1,112 @@ +/** + * @file inserter.hpp + * @author Daniel Pelanek + * @brief Declares Inserter class for clickhouse module, + * blocks and colums and helper structures for them. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "clickhouse.hpp" +#include "datatype.hpp" +#include "logger/logger.hpp" +#include "syncqueue.hpp" +#include "syncstack.hpp" + +#include +#include +#include + +/** + * @brief Noncopyable for inheritance. + * + */ +class Noncopyable { +public: + Noncopyable() = default; // Default constructor is fine + Noncopyable(const Noncopyable&) = delete; // Delete copy constructor + Noncopyable& operator=(const Noncopyable&) = delete; // Delete copy assignment operator +}; + +/** + * @brief Nonmoveable for inheritance. + * + */ +class Nonmoveable { +public: + Nonmoveable() = default; // Default constructor is fine + Nonmoveable(Nonmoveable&&) = delete; // Delete move constructor + Nonmoveable& operator=(Nonmoveable&&) = delete; // Delete move assignment operator +}; + +/** + * @brief A worker class responsible for inserting data into a ClickHouse table. + * + */ +class Inserter + : Nonmoveable + , Noncopyable { +public: + /** + * @brief Instantiate an inserter instance + * + * @param inserterId id + * @param logger The logger + * @param clientOpts The Clickhouse client options + * @param columns The column definitions + * @param table The table to insert the data into + * @param filledBlocks A queue of blocks ready to be sent + * @param emptyBlocks A queue of blocks that have been sent and are able to be reused + */ + Inserter( + int inserterId, + std::shared_ptr logger, + clickhouse::ClientOptions clientOpts, + const std::vector& columns, + const std::string& table, + SyncQueue& filledBlocks, + SyncStack& emptyBlocks); + + /** + * @brief Start the inserter thread + */ + void start(); + + /** + * @brief Stop the inserter thread + */ + void stop(); + + /** + * @brief Wait for the inserter thread to stop + */ + void join(); + + /** + * @brief Check if the inserter thread has encountered an error, and if so, rethrow the captured + * exception + */ + void checkError(); + +private: + int m_id; ///< unique thread or task identifier + std::shared_ptr m_logger; ///< logging utility reference + std::thread m_thread; ///< worker thread + std::atomic_bool m_stop_signal = false; ///< signals thread to stop + std::atomic_bool m_errored = false; ///< indicates if an error occurred + std::exception_ptr m_exception = nullptr; ///< stores exception thrown in thread + + clickhouse::ClientOptions m_client_opts; ///< ClickHouse client configuration + const std::vector& m_columns; ///< defines ClickHouse table schema + const std::string& m_table; ///< target ClickHouse table name + SyncQueue& m_filled_blocks; ///< queue of blocks ready to insert + SyncStack& m_empty_blocks; ///< stack of reusable empty blocks + + std::unique_ptr m_client; ///< ClickHouse client instance + + void connect(); ///< establishes connection to ClickHouse + void run(); ///< thread entry point + void insert(clickhouse::Block& block); ///< inserts a block into ClickHouse +}; From b89bb92f9c248f1fe460622d810c893549bed41e Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 10:32:51 +0200 Subject: [PATCH 10/14] clickhouse - introduce Manager Manager takes care of handling inserters, processing unirec data with datatype and storing it into blocks. Edited main for this and also to check if the template in input interface matches template specified in config --- modules/clickhouse/src/CMakeLists.txt | 1 + modules/clickhouse/src/manager.cpp | 190 ++++++++++++++++++++++++++ modules/clickhouse/src/manager.hpp | 80 +++++++++++ 3 files changed, 271 insertions(+) create mode 100644 modules/clickhouse/src/manager.cpp create mode 100644 modules/clickhouse/src/manager.hpp diff --git a/modules/clickhouse/src/CMakeLists.txt b/modules/clickhouse/src/CMakeLists.txt index d79bda10..3534cc5e 100644 --- a/modules/clickhouse/src/CMakeLists.txt +++ b/modules/clickhouse/src/CMakeLists.txt @@ -3,6 +3,7 @@ add_executable(clickhouse config.cpp datatype.cpp inserter.cpp + manager.cpp ) target_link_libraries(clickhouse PRIVATE diff --git a/modules/clickhouse/src/manager.cpp b/modules/clickhouse/src/manager.cpp new file mode 100644 index 00000000..fb4037aa --- /dev/null +++ b/modules/clickhouse/src/manager.cpp @@ -0,0 +1,190 @@ +/** + * @file manager.cpp + * @author Daniel Pelanek + * @brief Defines manager methods and helper function. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "manager.hpp" +#include "datatype.hpp" + +#include +#include +#include +#include + +/** + * @brief Initializes columns from config and corresponding lambdas for handling them. + * + * @param columns_cfg Columns from config + * @return std::vector Prepared columns for sending to clickhouse. + */ +static std::vector prepareColumns(const std::vector& columnsCfg) +{ + std::vector columns; + + for (const auto& columnCfg : columnsCfg) { + ColumnCtx column {}; + + column.name = columnCfg.name; + column.type = columnCfg.type; + column.fieldID = columnCfg.type; + + column.getter = makeGetter(columnCfg.type); + column.columnWriter = makeColumnwriter(columnCfg.type); + column.columnFactory = [=]() { return makeColumn(columnCfg.type); }; + + columns.emplace_back(std::move(column)); + } + + return columns; +} + +Manager::Manager(Config config) + : M_CONFIG(std::move(config)) +{ + m_columns = prepareColumns(M_CONFIG.columns); + + std::vector endpoints; + endpoints.reserve(M_CONFIG.connection.endpoints.size()); + std::transform( + M_CONFIG.connection.endpoints.begin(), + M_CONFIG.connection.endpoints.end(), + std::back_inserter(endpoints), + [](const Config::Endpoint& epCfg) { + return clickhouse::Endpoint {epCfg.host, epCfg.port}; + }); + + // Prepare blocks + m_logger->info("Preparing {} blocks", M_CONFIG.blocks); + for (unsigned int i = 0; i < M_CONFIG.blocks; i++) { + m_blocks.emplace_back(std::make_unique()); + BlockCtx& block = *m_blocks.back().get(); + for (const auto& column : m_columns) { + block.columns.emplace_back(column.columnFactory()); + block.block.AppendColumn(column.name, block.columns.back()); + } + m_empty_blocks.put(&block); + } + + // Prepare inserters + m_logger->info("Preparing {} inserter threads", M_CONFIG.inserterThreads); + for (unsigned int i = 0; i < M_CONFIG.inserterThreads; i++) { + auto clientOpts = clickhouse::ClientOptions() + .SetEndpoints(endpoints) + .SetUser(M_CONFIG.connection.user) + .SetPassword(M_CONFIG.connection.password) + .SetDefaultDatabase(M_CONFIG.connection.database); + + m_inserters.emplace_back(std::make_unique( + m_inserters.size() + 1, + m_logger, + clientOpts, + m_columns, + M_CONFIG.connection.table, + m_filled_blocks, + m_empty_blocks)); + } + + // Start inserter threads + m_logger->info("Starting inserter threads"); + for (auto& inserter : m_inserters) { + inserter->start(); + } + + m_logger->info("Clickhouse plugin is ready"); +} + +void Manager::processRecord(Nemea::UnirecRecordView& record) +{ + // Get new empty block if there is no current block + if (m_current_block == nullptr) { + m_current_block = m_empty_blocks.get(); + } + + for (ColumnCtx& ctx : m_columns) { + ctx.getter(record, ctx.fieldID, ctx.valueBuffer); + ctx.hasValue = true; + } + + for (size_t i = 0; i < m_columns.size(); i++) { + m_columns[i].columnWriter( + m_columns[i].hasValue ? &m_columns[i].valueBuffer : nullptr, + *m_current_block->columns[i].get()); + } + + m_current_block->rows++; + + std::time_t const now = std::time(nullptr); + if (m_start_time == 0) { + m_start_time = now; + m_last_insert_time = now; + m_last_stats_print_time = now; + } + + // Send the block for insertion if it is sufficiently full or a block hasn't been sent in a long + // enough time + if (m_current_block->rows >= M_CONFIG.blockInsertThreshold + || (uint64_t(now - m_last_insert_time) >= M_CONFIG.blockInsertMaxDelaySecs + && m_current_block->rows > 0)) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + m_last_insert_time = now; + } + + // Check for any exceptions was thrown by the inserter threads + for (auto& inserter : m_inserters) { + inserter->checkError(); + } +} + +void Manager::updateFieldIDs() +{ + // Export what's left in the last block + if ((m_current_block != nullptr) && m_current_block->rows > 0) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + } + + for (auto& column : m_columns) { + column.fieldID = static_cast(ur_get_id_by_name(column.name.c_str())); + + if (column.fieldID == UR_E_INVALID_NAME) { + printf("Invalid field name: %s\n", column.name.c_str()); + } + } + + m_logger->info("Updated field ids"); +} + +Config Manager::getConfig() const +{ + return this->M_CONFIG; +} + +void Manager::stop() +{ + // Export what's left in the last block + if ((m_current_block != nullptr) && m_current_block->rows > 0) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + } + + // Stop all the threads and wait for them to finish + m_logger->info("Sending stop signal to inserter threads..."); + for (auto& inserter : m_inserters) { + inserter->stop(); + } + for (const auto& inserter : m_inserters) { + (void) inserter; + // Wake up the inserter threads in case they are waiting on a .get() + m_filled_blocks.put(nullptr); + } + + m_logger->info("Waiting for inserter threads to finish..."); + for (auto& inserter : m_inserters) { + inserter->join(); + } +} diff --git a/modules/clickhouse/src/manager.hpp b/modules/clickhouse/src/manager.hpp new file mode 100644 index 00000000..1b3f7b6c --- /dev/null +++ b/modules/clickhouse/src/manager.hpp @@ -0,0 +1,80 @@ +/** + * @file manager.hpp + * @author Daniel Pelanek + * @brief Declares Manager class for clickhouse module + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "clickhouse.hpp" +#include "datatype.hpp" +#include "inserter.hpp" + +#include + +/** + * @brief Converts Unirec records to clickhouse format, buffers them and + * sends them through inserters. + * + * This class should be instantiated only once. It owns blocks of columns and + * and inserters. It also owns synced data structures which keep track of currently + * filled blocks. It fills the blocks inside process_record and inserters take fully + * filled blocks through sync queue by themselves to send. + * + */ +class Manager + : Nonmoveable + , Noncopyable { +public: + /** + * @brief Instantiate the manager instance + * + * @param config Config instance parsed in main. + */ + explicit Manager(Config config); + + /** + * @brief Stop the plugin and wait till it is stopped (blocking). + */ + void stop(); + + /** + * @brief Takes unirec record, converts it to clickhouse format and stores it. + * Adds to filled blocks if a block was sufficiently filled or none were sent + * in a specified time frame (m_config.block_insert_max_delay_secs). + * + * @param record Unirec record view to parse + */ + void processRecord(Nemea::UnirecRecordView& record); + + /** + * @brief changes unirec ids of fields after getting template in main. + * + */ + void updateFieldIDs(); + + /** + * @brief Returns config specified by argument to program. + * + * @return Config + */ + Config getConfig() const; + +private: + const Config M_CONFIG; ///< application configuration + std::shared_ptr m_logger + = Nm::loggerGet("manager"); ///< logging utility reference + std::vector m_columns; ///< ClickHouse table schema definition + + BlockCtx* m_current_block = nullptr; ///< pointer to the currently filling block + std::vector> m_inserters; ///< inserter worker instances + std::vector> m_blocks; ///< owned memory blocks + SyncStack m_empty_blocks; ///< pool of empty blocks for reuse + SyncQueue m_filled_blocks; ///< queue of blocks ready for insertion + + std::time_t m_start_time = 0; ///< application start time + std::time_t m_last_stats_print_time = 0; ///< last stats print timestamp + std::time_t m_last_insert_time = 0; ///< last data insert timestamp +}; From c8212409e4bba2d97cbb746de67ed97ee4d5fdcd Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 21:35:04 +0200 Subject: [PATCH 11/14] Clickhouse - introduce main.cpp Integrate module into main file --- modules/clickhouse/src/main.cpp | 174 +++++++++++++++++++++++++++++++- 1 file changed, 171 insertions(+), 3 deletions(-) diff --git a/modules/clickhouse/src/main.cpp b/modules/clickhouse/src/main.cpp index f27fc4d7..6c9917b6 100644 --- a/modules/clickhouse/src/main.cpp +++ b/modules/clickhouse/src/main.cpp @@ -1,7 +1,175 @@ +/** + * @file main.cpp + * @author Daniel Pelanek + * @brief Clickhouse Module: resend flowdata to clickhouse + * + * This file contains the main function and supporting functions for the Unirec Clickhouse Module. + * This module takes Unirec records from a unidirectional interface, converts them to + * Clickhouse format buffers them and then sends them to the specified Clickhouse server in config. + * It utilizes the Unirec++ library for record handling, argparse for command-line argument parsing, + * rapidxml for config parsing and clickhouse cpp library. + * Ported from ipfixcol2 clickhouse plugin. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "clickhouse.hpp" +#include "config.hpp" +#include "logger/logger.hpp" +#include "manager.hpp" + +#include +#include +#include +#include +#include + +using namespace Nemea; + +static std::atomic g_stopFlag(false); + +static void signalHandler(int signum) +{ + auto logger = Nm::loggerGet("main"); + logger->info("Interrupt signal {} received", signum); + g_stopFlag.store(true); +} + +/** + * @brief Handle format change exception by adjusting the template and check template + * against the one defined in config. + * + * This function is called when a `FormatChangeException` is caught in the main loop. + * It adjusts the template in the input interface to handle the format change but in this + * case the program only continues if the template is the same as defined in config. Meaning + * it only continues if the template changes to the same one. + * + * @param interface input interface for Unirec communication. + * @param manager Manager instance which buffers and sends data to clickhouse. + */ +static void handleFormatChange(UnirecInputInterface& interface, Manager& manager) +{ + interface.changeTemplate(); + + ur_template_t* changedTemplate = interface.getTemplate(); + auto res = std::unique_ptr( + ur_template_string_delimiter(changedTemplate, ','), + &free); + + const Config cfg = manager.getConfig(); + + if (cfg.templateColumnCsv != res.get()) { + throw std::runtime_error( + "Template in input interface doesn't match template in configuration."); + } + + manager.updateFieldIDs(); +} + +/** + * @brief Process unirec record in manager and forward to + * + * @param interface input interface for Unirec communication. + * @param manager Manager instance which buffers and sends data to clickhouse. + */ +static void processNextRecord(UnirecInputInterface& interface, Manager& manager) +{ + std::optional unirecRecord = interface.receive(); + if (!unirecRecord) { + return; + } + + manager.processRecord(*unirecRecord); +} + +/** + * @brief Process Unirec records. + * + * The `processUnirecRecords` function continuously receives Unirec records through the provided + * input interface (`interface`). Each received record is processed, buffered and + * then sent to a clickhouse database. + * + * @param interface input interface for Unirec communication. + * @param manager Manager instance which buffers and sends data to clickhouse. + */ +static void processUnirecRecords(UnirecInputInterface& interface, Manager& manager) +{ + while (!g_stopFlag.load()) { + try { + processNextRecord(interface, manager); + } catch (FormatChangeException& ex) { + handleFormatChange(interface, manager); + } catch (EoFException& ex) { + break; + } catch (std::exception& ex) { + throw; + } + } +} + int main(int argc, char** argv) { - (void) argc; - (void) argv; + argparse::ArgumentParser program("Unirec Clickhouse"); + + program.add_argument("-c", "--config") + .required() + .help("specify the xml config file. Format is in readme.") + .metavar("xml_file"); + + Unirec unirec({1, 0, "clickhouse", "Unirec clickhouse module"}); + + Nm::loggerInit(); + auto logger = Nm::loggerGet("main"); + + signal(SIGINT, signalHandler); + + try { + unirec.init(argc, argv); + } catch (HelpException& ex) { + std::cerr << program; + return EXIT_SUCCESS; + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + program.parse_args(argc, argv); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + Config config; + try { + config = parseConfig(program.get("--config")); + + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + std::unique_ptr manager; + try { + manager = std::make_unique(config); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + UnirecInputInterface interface = unirec.buildInputInterface(); + + processUnirecRecords(interface, *manager); + + logger->info("here"); + + manager->stop(); + + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } - return 0; + return EXIT_SUCCESS; } From 8e35a716c6464b6ccda1dd3cfcc5ff700840d03f Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 17:23:11 +0200 Subject: [PATCH 12/14] Clickhouse - Adjust RPM package build --- pkg/rpm/nemea-modules-ng.spec.in | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/rpm/nemea-modules-ng.spec.in b/pkg/rpm/nemea-modules-ng.spec.in index bb06d98e..eb74a5f5 100644 --- a/pkg/rpm/nemea-modules-ng.spec.in +++ b/pkg/rpm/nemea-modules-ng.spec.in @@ -36,6 +36,7 @@ that make up the main components of the test environment. %files %license LICENSE +%{_bindir}/nemea/clickhouse %{_bindir}/nemea/listDetector %{_bindir}/nemea/sampler %{_bindir}/nemea/telemetry_stats From 14a76c0cd9d2d12ebd9d143d5970340069560172 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 14:07:08 +0200 Subject: [PATCH 13/14] Clickhouse - add README.md --- modules/clickhouse/README.md | 165 +++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 modules/clickhouse/README.md diff --git a/modules/clickhouse/README.md b/modules/clickhouse/README.md new file mode 100644 index 00000000..cc3f319a --- /dev/null +++ b/modules/clickhouse/README.md @@ -0,0 +1,165 @@ +# clickhouse output module +Converts Unirec records into clickhouse format and stores them into database/s. +- When multiple database endpoints are specified data is sent only to one of them. +By default it is the first one and the others are used if the previous ones fail. + +## Interfaces +- Input: 1 +- Output: 0 + +## Parameters +### Common TRAP parameters +- `-h [trap,1]` Print help message for this module / for libtrap specific parameters. +- `-i IFC_SPEC` Specification of interface types and their parameters. +- `-v` Be verbose. +- `-vv` Be more verbose. +- `-vvv` Be even more verbose. + +### Module specific parameters +- `-c, --config ` YAML config specifying connections params and data columns + +## Usage +The module expects the ClickHouse database to already contain the table with +appropriate schema corresponding to the configuration entered. The existence +and schema of the table is checked after initiating connection to the database +and an error is displayed if there is a mismatch. The table is not +automatically created. + +### Unirec to clickhouse type conversion +| Unirec | Clickhouse | | Unirec | Clickhouse | +|---------|---------------|-|----------|----------------------| +| int8 | Int8 | | int8* | Array(Int8) | +| int16 | Int16 | | int16* | Array(Int16) | +| int32 | Int32 | | int32* | Array(Int32) | +| int64 | Int64 | | int64* | Array(Int64) | +| uint8 | UInt8 | | uint8* | Array(UInt8) | +| uint16 | UInt16 | | uint16* | Array(UInt16) | +| uint32 | UInt32 | | uint32* | Array(UInt32) | +| uint64 | UInt64 | | uint64* | Array(UInt64) | +| char | UInt8 | | char* | Array(UInt8) | +| float | Float32 | | float* | Array(Float32) | +| double | Float64 | | double* | Array(Float64) | +| ipaddr | IPv6 | | ipaddr* | Array(IPv6) | +| macaddr | Array(UInt8) | | macaddr* | Array(Array(UInt8)) | +| time | DateTime64(9) | | time* | Array(DateTime64(9)) | +| string | String | | | | +| bytes | Array(UInt8) | | | | + +### Clickhouse database and table creation example +```SQL +CREATE DATABASE IF NOT EXISTS clickhouse; +CREATE TABLE clickhouse.flows( + "DST_IP" IPv6, + "SRC_IP" IPv6, + "BYTES" UInt64, + "BYTES_REV" UInt64, + "LINK_BIT_FIELD" UInt64, + "TIME_FIRST" DateTime64(9), + "TIME_LAST" DateTime64(9), + "PACKETS" UInt32, + "PACKETS_REV" UInt32, + "DST_PORT" UInt16, + "SRC_PORT" UInt16, + "FLOW_END_REASON" UInt8, + "PROTOCOL" UInt8, + "TCP_FLAGS" UInt8, + "TCP_FLAGS_REV" UInt8, + "IDP_CONTENT" Array(UInt8), + "IDP_CONTENT_REV" Array(UInt8), + "PPI_PKT_DIRECTIONS" Array(Int8), + "PPI_PKT_FLAGS" Array(UInt8), + "TLS_JA3_FINGERPRINT" Array(UInt8), + "TLS_SNI" String, + "PPI_PKT_LENGTHS" Array(UInt16), + "DBI_BRST_BYTES" Array(UInt32), + "DBI_BRST_PACKETS" Array(UInt32), + "D_PHISTS_IPT" Array(UInt32), + "D_PHISTS_SIZES" Array(UInt32), + "SBI_BRST_BYTES" Array(UInt32), + "SBI_BRST_PACKETS" Array(UInt32), + "S_PHISTS_IPT" Array(UInt32), + "S_PHISTS_SIZES" Array(UInt32), + "DBI_BRST_TIME_START" Array(DateTime64(9)), + "DBI_BRST_TIME_STOP" Array(DateTime64(9)), + "PPI_PKT_TIMES" Array(DateTime64(9)), + "SBI_BRST_TIME_START" Array(DateTime64(9)), + "SBI_BRST_TIME_STOP" Array(DateTime64(9)) +) +ENGINE = MergeTree +ORDER BY TIME_FIRST +``` + +## Configuration +YAML config + +### Config specification +| Parameter | Description | Default | +|-----------|-------------|---------| +| **connection** | The database connection parameters. | | +| connection.endpoints | The possible endpoints data can be sent to, i.e., all the replicas of a particular shard. In case one endpoint is unreachable, another one is used. | | +| connection.endpoints.endpoint | Connection parameters of one endpoint. | | +| connection.endpoints.endpoint.host | The ClickHouse database host as a domain name or an IP address. | | +| connection.endpoints.endpoint.port | The port of the ClickHouse database. | 9000 | +| connection.username | The database username. | | +| connection.password | The database password. | | +| connection.database | The database name where the specified table is present. | | +| connection.table | The name of the table to insert the data into. | | +| **blocks** | Number of data blocks in circulation. Each block is de-facto a memory buffer that the rows are written to before being sent out to the ClickHouse database. | 64 | +| **inserterThreads** | Number of threads used for data insertion to ClickHouse. In other words, the number of ClickHouse connections that are concurrently used. | 8 | +| **blockInsertThreshold** | Number of rows to be buffered into a block before the block is sent out to be inserted into the database. | 100000 | +| **blockInsertMaxDelaySecs** | Maximum number of seconds to wait before a block gets sent out to be inserted into the database even if the threshold has not been reached yet. | 10 | +| **columns** | List of fields which each row consists of. It is in unirec template format. ([TYPE] [NAME]) | | + + +### Example configuration +```YAML +connection: + endpoints: + - host: localhost + port: 9000 + username: clickhouse + password: clickhouse + database: clickhouse + table: flows + +inserterThreads: 32 +blocks: 1024 +blockInsertThreshold: 100000 + +columns: + - ipaddr DST_IP + - ipaddr SRC_IP + - uint64 BYTES + - uint64 BYTES_REV + - uint64 LINK_BIT_FIELD + - time TIME_FIRST + - time TIME_LAST + - uint32 PACKETS + - uint32 PACKETS_REV + - uint16 DST_PORT + - uint16 SRC_PORT + - uint8 FLOW_END_REASON + - uint8 PROTOCOL + - uint8 TCP_FLAGS + - uint8 TCP_FLAGS_REV + - bytes IDP_CONTENT + - bytes IDP_CONTENT_REV + - int8* PPI_PKT_DIRECTIONS + - uint8* PPI_PKT_FLAGS + - bytes TLS_JA3_FINGERPRINT + - string TLS_SNI + - uint16* PPI_PKT_LENGTHS + - uint32* DBI_BRST_BYTES + - uint32* DBI_BRST_PACKETS + - uint32* D_PHISTS_IPT + - uint32* D_PHISTS_SIZES + - uint32* SBI_BRST_BYTES + - uint32* SBI_BRST_PACKETS + - uint32* S_PHISTS_IPT + - uint32* S_PHISTS_SIZES + - time* DBI_BRST_TIME_START + - time* DBI_BRST_TIME_STOP + - time* PPI_PKT_TIMES + - time* SBI_BRST_TIME_START + - time* SBI_BRST_TIME_STOP +``` From 76d1cb5375f7875b221efe45596babe40cd6e0f0 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 1 Sep 2025 16:56:49 +0200 Subject: [PATCH 14/14] Update README.md --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 80841610..be2a1a53 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ This repository contains basic modules of the [NEMEA system](https://github.com/CESNET/Nemea). The modules and their functionality/purposes are: -* [ListDetector](modules/listdetector/): forwards records that match rules list. +* [Clickhouse](modules/clickhouse/): converts unirec into clickhouse DB. +* [Deduplicator](modules/deduplicator/): omit duplicate records. +* [ListDetector](modules/listDetector/): forwards records that match rules list. * [Sampler](modules/sampler/): sample records at the given rate. * [Telemetry](modules/telemetry/): provides unirec telemetry of the input interface. -* [Deduplicator](modules/deduplicator/): omit duplicate records.