Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/stirling/source_connectors/file_source/file_source_connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@ StatusOr<BackedDataElements> DataElementsFromCSV(std::ifstream& file_name) {
return BackedDataElements(0);
}

StatusOr<BackedDataElements> DataElementsFromTetragon() {
BackedDataElements data_elements(5);
data_elements.emplace_back("time_", "", types::DataType::STRING);
data_elements.emplace_back("node_name", "", types::DataType::STRING);
data_elements.emplace_back("type", "", types::DataType::STRING);
data_elements.emplace_back("payload", "", types::DataType::STRING);
data_elements.emplace_back("uuid", "", types::DataType::UINT128);
return data_elements;
}

StatusOr<BackedDataElements> DataElementsFromKubescape() {
BackedDataElements data_elements(11);
data_elements.emplace_back("time_", "", types::DataType::STRING);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you didn't use a types::DataType::TIME64NS column? You might run into compatibility issues (PxL, etc) later since it differs from how all of Pixie's other tables are (ever data table has a time_ TIME64NS column).

In addition to this, types::DataType::TIME64NS is significantly more memory efficient than types::DataType::STRING. The former is a 64 bit value (fixed size) while the latter is a variable byte value (~ 1 byte for every character in the timestamp string).

Separately, have you tested this end to end? I ask because our in memory database requires a proper time column if I remember correctly.

I've seen issues in the past where a query never completes in these cases. When I experienced this, I think the data table in question didn't have any time_ column, so maybe it wouldn't manifest the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it was my mistake. I was trying to assign the time string in ISO format from the log. I overlooked time_ and time. I am gonna differentiate the naming.

data_elements.emplace_back("level", "", types::DataType::STRING);
data_elements.emplace_back("RuleID", "", types::DataType::STRING);
data_elements.emplace_back("message", "", types::DataType::STRING);
data_elements.emplace_back("msg", "", types::DataType::STRING);
data_elements.emplace_back("event", "", types::DataType::STRING);
data_elements.emplace_back("BaseRuntimeMetadata", "", types::DataType::STRING);
data_elements.emplace_back("CloudMetadata", "", types::DataType::STRING);
data_elements.emplace_back("RuntimeK8sDetails", "", types::DataType::STRING);
data_elements.emplace_back("RuntimeProcessDetails", "", types::DataType::STRING);
data_elements.emplace_back("uuid", "", types::DataType::UINT128);
return data_elements;
}

StatusOr<BackedDataElements> DataElementsForUnstructuredFile() {
BackedDataElements data_elements(3);
data_elements.emplace_back("time_", "", types::DataType::TIME64NS);
Expand All @@ -110,6 +136,10 @@ StatusOr<std::pair<BackedDataElements, std::ifstream>> DataElementsFromFile(
PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromCSV(f));
} else if (extension == ".json") {
PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromJSON(f));
} else if (extension == ".tetragon") {
PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromTetragon(f));
} else if (extension == ".kubescape") {
PX_ASSIGN_OR_RETURN(data_elements, DataElementsFromKubescape(f));
} else {
if (allow_unstructured) {
LOG(WARNING) << absl::Substitute("Unsupported file type: $0, treating each line as a single column", extension);
Expand Down Expand Up @@ -151,6 +181,8 @@ FileSourceConnector::FileSourceConnector(std::string_view source_name,
transfer_specs_({
{".json", {&FileSourceConnector::TransferDataFromJSON}},
{".csv", {&FileSourceConnector::TransferDataFromCSV}},
{".tetragon", {&FileSourceConnector::TransferDataFromTetragon}},
{".kubescape", {&FileSourceConnector::TransferDataFromKubescape}},
{"", {&FileSourceConnector::TransferDataFromUnstructuredFile}},
{".log", {&FileSourceConnector::TransferDataFromUnstructuredFile}},
}) {}
Expand All @@ -168,6 +200,101 @@ Status FileSourceConnector::StopImpl() {

constexpr int kMaxLines = 1000;

void FileSourceConnector::TransferDataFromTetragon(DataTable::DynamicRecordBuilder* /*r*/,
const std::string& line) {
DataTable::DynamicRecordBuilder r(data_tables_[0]);
rapidjson::Document doc;
doc.Parse(line.c_str());

if (doc.HasParseError() || !doc.IsObject()) {
LOG(ERROR) << "Invalid JSON line: " << line;
return;
}

// Extract "time"
std::string timeStr = "empty";
if (doc.HasMember("time") && doc["time"].IsString()) {
timeStr = doc["time"].GetString();
}
r.Append(0, types::StringValue(timeStr), kMaxStringBytes);
Comment on lines +215 to +219
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what situations will time be missing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, in regular workflow, it wont be. I will remove default


// Extract "node_name"
std::string nodeNameStr = "empty";
if (doc.HasMember("node_name") && doc["node_name"].IsString()) {
nodeNameStr = doc["node_name"].GetString();
}
r.Append(1, types::StringValue(nodeNameStr), kMaxStringBytes);
Comment on lines +221 to +226
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider how to extend the DynamicRecordBuilder to have default values for columns. It would be nice if the schema defined if a column had a default value and could handle this case without the TransferData function needing to specially handle specific columns.

I don't think this should block you from moving forward with this depending on the priorities, but I think it would be possible to enhance TransferDataFromJSON to handle this use case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Find the actual event type (the key that isn't "time" or "node_name")
std::string typeStr;
const rapidjson::Value* payloadVal = nullptr;

for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) {
std::string key = it->name.GetString();
if (key != "time" && key != "node_name") {
typeStr = key;
payloadVal = &it->value;
break;
Comment on lines +234 to +237
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels dangerous to depend on the sort order of rapidjson's object iteration. Can you explain more details on identifying this type field and why it won't be known up front?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, the tetragon always have 3 root keys. time, node_name, and the third one alternating in a fix list. process_exec, process_exit, process_kprobe, process_uprobe, process_tracepoint, process_loader.

I agree if there is a 4th property, it would be dangerous. Maybe I can check if key in [process_exec, process_exit, process_kprobe, process_uprobe, process_tracepoint, process_loader]

}
}

if (typeStr.empty() || payloadVal == nullptr) {
LOG(WARNING) << "⚠️ Skipping line, no extra field found.";
return;
}

r.Append(2, types::StringValue(typeStr), kMaxStringBytes);

// Serialize the payload back to string
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
payloadVal->Accept(writer);
r.Append(3, types::StringValue(buffer.GetString()), kMaxStringBytes);
sole::uuid u = sole::uuid4();
r.Append(4, types::UInt128Value(u.ab, u.cd));
return;
}

void FileSourceConnector::TransferDataFromKubescape(DataTable::DynamicRecordBuilder* /*r*/,
const std::string& line) {
DataTable::DynamicRecordBuilder r(data_tables_[0]);

rapidjson::Document doc;
doc.Parse(line.c_str());
if (doc.HasParseError() || !doc.IsObject()) {
LOG(ERROR) << "❌ Invalid JSON: " << line;
return;
}

// Helper to convert RapidJSON value to string
auto stringify = [](const rapidjson::Value& val, rapidjson::Document::AllocatorType& alloc) -> std::string {
if (val.IsString()) {
return val.GetString();
}
rapidjson::StringBuffer buf;
rapidjson::Writer<rapidjson::StringBuffer> writer(buf);
val.Accept(writer);
return buf.GetString();
};

// Fill each column — make sure your schema matches this order!
r.Append(0, types::StringValue(stringify(doc["time"], doc.GetAllocator())), kMaxStringBytes);
r.Append(1, types::StringValue(stringify(doc["level"], doc.GetAllocator())), kMaxStringBytes);
r.Append(2, types::StringValue(stringify(doc["RuleID"], doc.GetAllocator())), kMaxStringBytes);
r.Append(3, types::StringValue(stringify(doc["message"], doc.GetAllocator())), kMaxStringBytes);
r.Append(4, types::StringValue(stringify(doc["msg"], doc.GetAllocator())), kMaxStringBytes);
r.Append(5, types::StringValue(stringify(doc["event"], doc.GetAllocator())), kMaxStringBytes);
r.Append(6, types::StringValue(stringify(doc["BaseRuntimeMetadata"], doc.GetAllocator())), kMaxStringBytes);
r.Append(7, types::StringValue(stringify(doc["CloudMetadata"], doc.GetAllocator())), kMaxStringBytes);
r.Append(8, types::StringValue(stringify(doc["RuntimeK8sDetails"], doc.GetAllocator())), kMaxStringBytes);
r.Append(9, types::StringValue(stringify(doc["RuntimeProcessDetails"], doc.GetAllocator())), kMaxStringBytes);
sole::uuid u = sole::uuid4();
r.Append(10, types::UInt128Value(u.ab, u.cd));
Comment on lines +280 to +292
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine to use, but seems brittle in the event that the schema changes (hard coded column offsets, column names hard coded, etc).

Is there a material difference between this and the FileSourceConnector::TransferDataFromJSON function? It seems to me if the data table schema is created properly, the ::TransferDataFromJSON function would behave the same (assuming the time_ column can be changed; see my comment from above).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually for kubescape, the current FileSource is valid to process. I separated it to file extension .kubescape with its own function to extend it in future. Like providing clickhouse write etc.

But if you recommend using just .json extension for kubescape and not implement it separately for now, I am okay with it 😄 I just wanted to explicitly show you what we are trying to do.

return;
}



void FileSourceConnector::TransferDataFromJSON(DataTable::DynamicRecordBuilder* /*r*/,
uint64_t nanos, const std::string& line) {
rapidjson::Document d;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class FileSourceConnector : public SourceConnector {
const std::string& line);
void TransferDataFromCSV(DataTable::DynamicRecordBuilder* builder, uint64_t nanos,
const std::string& line);
void TransferDataFromTetragon(DataTable::DynamicRecordBuilder* builder,
const std::string& line)
void FileSourceConnector::TransferDataFromKubescape(DataTable::DynamicRecordBuilder* builder,
const std::string& line)

struct FileTransferSpec {
std::function<void(FileSourceConnector&, DataTable::DynamicRecordBuilder*, uint64_t nanos,
Expand All @@ -82,6 +86,8 @@ class FileSourceConnector : public SourceConnector {
StatusOr<BackedDataElements> DataElementsFromJSON(std::ifstream& f_stream);
StatusOr<BackedDataElements> DataElementsFromCSV(std::ifstream& f_stream);
StatusOr<BackedDataElements> DataElementsForUnstructuredFile();
StatusOr<BackedDataElements> DataElementsFromTetragon()
StatusOr<BackedDataElements> DataElementsFromKubescape()

} // namespace stirling
} // namespace px