diff --git a/src/stirling/source_connectors/file_source/file_source_connector.cc b/src/stirling/source_connectors/file_source/file_source_connector.cc index 112c472ce05..dcbd31114a6 100644 --- a/src/stirling/source_connectors/file_source/file_source_connector.cc +++ b/src/stirling/source_connectors/file_source/file_source_connector.cc @@ -85,6 +85,32 @@ StatusOr DataElementsFromCSV(std::ifstream& file_name) { return BackedDataElements(0); } +StatusOr DataElementsFromTetragon() { + BackedDataElements data_elements(5); + data_elements.emplace_back("time_", "", types::DataType::STRING); + data_elements.emplace_back("node_name", "", types::DataType::STRING); + data_elements.emplace_back("type", "", types::DataType::STRING); + data_elements.emplace_back("payload", "", types::DataType::STRING); + data_elements.emplace_back("uuid", "", types::DataType::UINT128); + return data_elements; +} + +StatusOr DataElementsFromKubescape() { + BackedDataElements data_elements(11); + data_elements.emplace_back("time_", "", types::DataType::STRING); + data_elements.emplace_back("level", "", types::DataType::STRING); + data_elements.emplace_back("RuleID", "", types::DataType::STRING); + data_elements.emplace_back("message", "", types::DataType::STRING); + data_elements.emplace_back("msg", "", types::DataType::STRING); + data_elements.emplace_back("event", "", types::DataType::STRING); + data_elements.emplace_back("BaseRuntimeMetadata", "", types::DataType::STRING); + data_elements.emplace_back("CloudMetadata", "", types::DataType::STRING); + data_elements.emplace_back("RuntimeK8sDetails", "", types::DataType::STRING); + data_elements.emplace_back("RuntimeProcessDetails", "", types::DataType::STRING); + data_elements.emplace_back("uuid", "", types::DataType::UINT128); + return data_elements; +} + StatusOr DataElementsForUnstructuredFile() { BackedDataElements data_elements(3); data_elements.emplace_back("time_", "", types::DataType::TIME64NS); @@ -110,6 +136,10 @@ StatusOr> DataElementsFromFile( PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromCSV(f)); } else if (extension == ".json") { PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromJSON(f)); + } else if (extension == ".tetragon") { + PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromTetragon(f)); + } else if (extension == ".kubescape") { + PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromKubescape(f)); } else { if (allow_unstructured) { LOG(WARNING) << absl::Substitute("Unsupported file type: $0, treating each line as a single column", extension); @@ -151,6 +181,8 @@ FileSourceConnector::FileSourceConnector(std::string_view source_name, transfer_specs_({ {".json", {&FileSourceConnector::TransferDataFromJSON}}, {".csv", {&FileSourceConnector::TransferDataFromCSV}}, + {".tetragon", {&FileSourceConnector::TransferDataFromTetragon}}, + {".kubescape", {&FileSourceConnector::TransferDataFromKubescape}}, {"", {&FileSourceConnector::TransferDataFromUnstructuredFile}}, {".log", {&FileSourceConnector::TransferDataFromUnstructuredFile}}, }) {} @@ -168,6 +200,101 @@ Status FileSourceConnector::StopImpl() { constexpr int kMaxLines = 1000; +void FileSourceConnector::TransferDataFromTetragon(DataTable::DynamicRecordBuilder* /*r*/, + const std::string& line) { + DataTable::DynamicRecordBuilder r(data_tables_[0]); + rapidjson::Document doc; + doc.Parse(line.c_str()); + + if (doc.HasParseError() || !doc.IsObject()) { + LOG(ERROR) << "Invalid JSON line: " << line; + return; + } + + // Extract "time" + std::string timeStr = "empty"; + if (doc.HasMember("time") && doc["time"].IsString()) { + timeStr = doc["time"].GetString(); + } + r.Append(0, types::StringValue(timeStr), kMaxStringBytes); + + // Extract "node_name" + std::string nodeNameStr = "empty"; + if (doc.HasMember("node_name") && doc["node_name"].IsString()) { + nodeNameStr = doc["node_name"].GetString(); + } + r.Append(1, types::StringValue(nodeNameStr), kMaxStringBytes); + + // Find the actual event type (the key that isn't "time" or "node_name") + std::string typeStr; + const rapidjson::Value* payloadVal = nullptr; + + for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) { + std::string key = it->name.GetString(); + if (key != "time" && key != "node_name") { + typeStr = key; + payloadVal = &it->value; + break; + } + } + + if (typeStr.empty() || payloadVal == nullptr) { + LOG(WARNING) << "⚠️ Skipping line, no extra field found."; + return; + } + + r.Append(2, types::StringValue(typeStr), kMaxStringBytes); + + // Serialize the payload back to string + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + payloadVal->Accept(writer); + r.Append(3, types::StringValue(buffer.GetString()), kMaxStringBytes); + sole::uuid u = sole::uuid4(); + r.Append(4, types::UInt128Value(u.ab, u.cd)); + return; +} + +void FileSourceConnector::TransferDataFromKubescape(DataTable::DynamicRecordBuilder* /*r*/, + const std::string& line) { + DataTable::DynamicRecordBuilder r(data_tables_[0]); + + rapidjson::Document doc; + doc.Parse(line.c_str()); + if (doc.HasParseError() || !doc.IsObject()) { + LOG(ERROR) << "❌ Invalid JSON: " << line; + return; + } + + // Helper to convert RapidJSON value to string + auto stringify = [](const rapidjson::Value& val, rapidjson::Document::AllocatorType& alloc) -> std::string { + if (val.IsString()) { + return val.GetString(); + } + rapidjson::StringBuffer buf; + rapidjson::Writer writer(buf); + val.Accept(writer); + return buf.GetString(); + }; + + // Fill each column — make sure your schema matches this order! + r.Append(0, types::StringValue(stringify(doc["time"], doc.GetAllocator())), kMaxStringBytes); + r.Append(1, types::StringValue(stringify(doc["level"], doc.GetAllocator())), kMaxStringBytes); + r.Append(2, types::StringValue(stringify(doc["RuleID"], doc.GetAllocator())), kMaxStringBytes); + r.Append(3, types::StringValue(stringify(doc["message"], doc.GetAllocator())), kMaxStringBytes); + r.Append(4, types::StringValue(stringify(doc["msg"], doc.GetAllocator())), kMaxStringBytes); + r.Append(5, types::StringValue(stringify(doc["event"], doc.GetAllocator())), kMaxStringBytes); + r.Append(6, types::StringValue(stringify(doc["BaseRuntimeMetadata"], doc.GetAllocator())), kMaxStringBytes); + r.Append(7, types::StringValue(stringify(doc["CloudMetadata"], doc.GetAllocator())), kMaxStringBytes); + r.Append(8, types::StringValue(stringify(doc["RuntimeK8sDetails"], doc.GetAllocator())), kMaxStringBytes); + r.Append(9, types::StringValue(stringify(doc["RuntimeProcessDetails"], doc.GetAllocator())), kMaxStringBytes); + sole::uuid u = sole::uuid4(); + r.Append(10, types::UInt128Value(u.ab, u.cd)); + return; +} + + + void FileSourceConnector::TransferDataFromJSON(DataTable::DynamicRecordBuilder* /*r*/, uint64_t nanos, const std::string& line) { rapidjson::Document d; diff --git a/src/stirling/source_connectors/file_source/file_source_connector.h b/src/stirling/source_connectors/file_source/file_source_connector.h index 1525327a652..6fdee59f73b 100644 --- a/src/stirling/source_connectors/file_source/file_source_connector.h +++ b/src/stirling/source_connectors/file_source/file_source_connector.h @@ -63,6 +63,10 @@ class FileSourceConnector : public SourceConnector { const std::string& line); void TransferDataFromCSV(DataTable::DynamicRecordBuilder* builder, uint64_t nanos, const std::string& line); + void TransferDataFromTetragon(DataTable::DynamicRecordBuilder* builder, + const std::string& line) + void FileSourceConnector::TransferDataFromKubescape(DataTable::DynamicRecordBuilder* builder, + const std::string& line) struct FileTransferSpec { std::function DataElementsFromJSON(std::ifstream& f_stream); StatusOr DataElementsFromCSV(std::ifstream& f_stream); StatusOr DataElementsForUnstructuredFile(); +StatusOr DataElementsFromTetragon() +StatusOr DataElementsFromKubescape() } // namespace stirling } // namespace px