-
Notifications
You must be signed in to change notification settings - Fork 0
extend filesource to handle files with tetragon and kubescape extension #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,32 @@ StatusOr<BackedDataElements> DataElementsFromCSV(std::ifstream& file_name) { | |
| return BackedDataElements(0); | ||
| } | ||
|
|
||
| StatusOr<BackedDataElements> DataElementsFromTetragon() { | ||
| BackedDataElements data_elements(5); | ||
| data_elements.emplace_back("time_", "", types::DataType::STRING); | ||
| data_elements.emplace_back("node_name", "", types::DataType::STRING); | ||
| data_elements.emplace_back("type", "", types::DataType::STRING); | ||
| data_elements.emplace_back("payload", "", types::DataType::STRING); | ||
| data_elements.emplace_back("uuid", "", types::DataType::UINT128); | ||
| return data_elements; | ||
| } | ||
|
|
||
| StatusOr<BackedDataElements> DataElementsFromKubescape() { | ||
| BackedDataElements data_elements(11); | ||
| data_elements.emplace_back("time_", "", types::DataType::STRING); | ||
| data_elements.emplace_back("level", "", types::DataType::STRING); | ||
| data_elements.emplace_back("RuleID", "", types::DataType::STRING); | ||
| data_elements.emplace_back("message", "", types::DataType::STRING); | ||
| data_elements.emplace_back("msg", "", types::DataType::STRING); | ||
| data_elements.emplace_back("event", "", types::DataType::STRING); | ||
| data_elements.emplace_back("BaseRuntimeMetadata", "", types::DataType::STRING); | ||
| data_elements.emplace_back("CloudMetadata", "", types::DataType::STRING); | ||
| data_elements.emplace_back("RuntimeK8sDetails", "", types::DataType::STRING); | ||
| data_elements.emplace_back("RuntimeProcessDetails", "", types::DataType::STRING); | ||
| data_elements.emplace_back("uuid", "", types::DataType::UINT128); | ||
| return data_elements; | ||
| } | ||
|
|
||
| StatusOr<BackedDataElements> DataElementsForUnstructuredFile() { | ||
| BackedDataElements data_elements(3); | ||
| data_elements.emplace_back("time_", "", types::DataType::TIME64NS); | ||
|
|
@@ -110,6 +136,10 @@ StatusOr<std::pair<BackedDataElements, std::ifstream>> DataElementsFromFile( | |
| PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromCSV(f)); | ||
| } else if (extension == ".json") { | ||
| PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromJSON(f)); | ||
| } else if (extension == ".tetragon") { | ||
| PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromTetragon(f)); | ||
| } else if (extension == ".kubescape") { | ||
| PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromKubescape(f)); | ||
| } else { | ||
| if (allow_unstructured) { | ||
| LOG(WARNING) << absl::Substitute("Unsupported file type: $0, treating each line as a single column", extension); | ||
|
|
@@ -151,6 +181,8 @@ FileSourceConnector::FileSourceConnector(std::string_view source_name, | |
| transfer_specs_({ | ||
| {".json", {&FileSourceConnector::TransferDataFromJSON}}, | ||
| {".csv", {&FileSourceConnector::TransferDataFromCSV}}, | ||
| {".tetragon", {&FileSourceConnector::TransferDataFromTetragon}}, | ||
| {".kubescape", {&FileSourceConnector::TransferDataFromKubescape}}, | ||
| {"", {&FileSourceConnector::TransferDataFromUnstructuredFile}}, | ||
| {".log", {&FileSourceConnector::TransferDataFromUnstructuredFile}}, | ||
| }) {} | ||
|
|
@@ -168,6 +200,101 @@ Status FileSourceConnector::StopImpl() { | |
|
|
||
| constexpr int kMaxLines = 1000; | ||
|
|
||
| void FileSourceConnector::TransferDataFromTetragon(DataTable::DynamicRecordBuilder* /*r*/, | ||
| const std::string& line) { | ||
| DataTable::DynamicRecordBuilder r(data_tables_[0]); | ||
| rapidjson::Document doc; | ||
| doc.Parse(line.c_str()); | ||
|
|
||
| if (doc.HasParseError() || !doc.IsObject()) { | ||
| LOG(ERROR) << "Invalid JSON line: " << line; | ||
| return; | ||
| } | ||
|
|
||
| // Extract "time" | ||
| std::string timeStr = "empty"; | ||
| if (doc.HasMember("time") && doc["time"].IsString()) { | ||
| timeStr = doc["time"].GetString(); | ||
| } | ||
| r.Append(0, types::StringValue(timeStr), kMaxStringBytes); | ||
|
Comment on lines
+215
to
+219
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what situations will
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, in regular workflow, it wont be. I will remove default |
||
|
|
||
| // Extract "node_name" | ||
| std::string nodeNameStr = "empty"; | ||
| if (doc.HasMember("node_name") && doc["node_name"].IsString()) { | ||
| nodeNameStr = doc["node_name"].GetString(); | ||
| } | ||
| r.Append(1, types::StringValue(nodeNameStr), kMaxStringBytes); | ||
|
Comment on lines
+221
to
+226
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should consider how to extend the I don't think this should block you from moving forward with this depending on the priorities, but I think it would be possible to enhance
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| // Find the actual event type (the key that isn't "time" or "node_name") | ||
| std::string typeStr; | ||
| const rapidjson::Value* payloadVal = nullptr; | ||
|
|
||
| for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) { | ||
| std::string key = it->name.GetString(); | ||
| if (key != "time" && key != "node_name") { | ||
| typeStr = key; | ||
| payloadVal = &it->value; | ||
| break; | ||
|
Comment on lines
+234
to
+237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels dangerous to depend on the sort order of rapidjson's object iteration. Can you explain more details on identifying this type field and why it won't be known up front?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I know, the tetragon always have 3 root keys. time, node_name, and the third one alternating in a fix list. process_exec, process_exit, process_kprobe, process_uprobe, process_tracepoint, process_loader. I agree if there is a 4th property, it would be dangerous. Maybe I can check |
||
| } | ||
| } | ||
|
|
||
| if (typeStr.empty() || payloadVal == nullptr) { | ||
| LOG(WARNING) << "⚠️ Skipping line, no extra field found."; | ||
| return; | ||
| } | ||
|
|
||
| r.Append(2, types::StringValue(typeStr), kMaxStringBytes); | ||
|
|
||
| // Serialize the payload back to string | ||
| rapidjson::StringBuffer buffer; | ||
| rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); | ||
| payloadVal->Accept(writer); | ||
| r.Append(3, types::StringValue(buffer.GetString()), kMaxStringBytes); | ||
| sole::uuid u = sole::uuid4(); | ||
| r.Append(4, types::UInt128Value(u.ab, u.cd)); | ||
| return; | ||
| } | ||
|
|
||
| void FileSourceConnector::TransferDataFromKubescape(DataTable::DynamicRecordBuilder* /*r*/, | ||
| const std::string& line) { | ||
| DataTable::DynamicRecordBuilder r(data_tables_[0]); | ||
|
|
||
| rapidjson::Document doc; | ||
| doc.Parse(line.c_str()); | ||
| if (doc.HasParseError() || !doc.IsObject()) { | ||
| LOG(ERROR) << "❌ Invalid JSON: " << line; | ||
| return; | ||
| } | ||
|
|
||
| // Helper to convert RapidJSON value to string | ||
| auto stringify = [](const rapidjson::Value& val, rapidjson::Document::AllocatorType& alloc) -> std::string { | ||
| if (val.IsString()) { | ||
| return val.GetString(); | ||
| } | ||
| rapidjson::StringBuffer buf; | ||
| rapidjson::Writer<rapidjson::StringBuffer> writer(buf); | ||
| val.Accept(writer); | ||
| return buf.GetString(); | ||
| }; | ||
|
|
||
| // Fill each column — make sure your schema matches this order! | ||
| r.Append(0, types::StringValue(stringify(doc["time"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(1, types::StringValue(stringify(doc["level"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(2, types::StringValue(stringify(doc["RuleID"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(3, types::StringValue(stringify(doc["message"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(4, types::StringValue(stringify(doc["msg"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(5, types::StringValue(stringify(doc["event"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(6, types::StringValue(stringify(doc["BaseRuntimeMetadata"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(7, types::StringValue(stringify(doc["CloudMetadata"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(8, types::StringValue(stringify(doc["RuntimeK8sDetails"], doc.GetAllocator())), kMaxStringBytes); | ||
| r.Append(9, types::StringValue(stringify(doc["RuntimeProcessDetails"], doc.GetAllocator())), kMaxStringBytes); | ||
| sole::uuid u = sole::uuid4(); | ||
| r.Append(10, types::UInt128Value(u.ab, u.cd)); | ||
|
Comment on lines
+280
to
+292
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fine to use, but seems brittle in the event that the schema changes (hard coded column offsets, column names hard coded, etc). Is there a material difference between this and the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually for kubescape, the current FileSource is valid to process. I separated it to file extension But if you recommend using just .json extension for kubescape and not implement it separately for now, I am okay with it 😄 I just wanted to explicitly show you what we are trying to do. |
||
| return; | ||
| } | ||
|
|
||
|
|
||
|
|
||
| void FileSourceConnector::TransferDataFromJSON(DataTable::DynamicRecordBuilder* /*r*/, | ||
| uint64_t nanos, const std::string& line) { | ||
| rapidjson::Document d; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason you didn't use a
types::DataType::TIME64NScolumn? You might run into compatibility issues (PxL, etc) later since it differs from how all of Pixie's other tables are (ever data table has atime_TIME64NS column).In addition to this,
types::DataType::TIME64NSis significantly more memory efficient thantypes::DataType::STRING. The former is a 64 bit value (fixed size) while the latter is a variable byte value (~ 1 byte for every character in the timestamp string).Separately, have you tested this end to end? I ask because our in memory database requires a proper time column if I remember correctly.
I've seen issues in the past where a query never completes in these cases. When I experienced this, I think the data table in question didn't have any
time_column, so maybe it wouldn't manifest the same.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it was my mistake. I was trying to assign the time string in ISO format from the log. I overlooked time_ and time. I am gonna differentiate the naming.