From a7413de53d105d4479fa0c84f07b238b41912b50 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sat, 9 May 2026 20:00:54 +0800 Subject: [PATCH 01/10] optimize get_timeseries_metadata() to eliminate N+1 I/O - Add get_device_timeseries_meta_by_offset() that accepts pre-resolved offsets, skipping redundant load_device_index_entry() tree search and reusing the deserialized measurement index node via get_all_leaf() instead of re-reading the same bytes through load_all_measurement_index_entry() - Add get_all_device_entries() to collect device IDs with their (start_offset, end_offset) in a single index tree traversal - Rewrite get_timeseries_metadata() to use the above two methods - Remove redundant PageArena::init() call in get_timeseries_metadata_impl --- cpp/src/file/tsfile_io_reader.cc | 64 +++++++++++++++++++++ cpp/src/file/tsfile_io_reader.h | 5 ++ cpp/src/reader/tsfile_reader.cc | 97 +++++++++++++++++++++++++++++--- 3 files changed, 158 insertions(+), 8 deletions(-) diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index e96008a47..61a75463d 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -155,6 +155,70 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( return ret; } +int TsFileIOReader::get_device_timeseries_meta_by_offset( + int64_t start_offset, int64_t end_offset, + std::vector& timeseries_indexs, PageArena& pa) { + int ret = E_OK; + load_tsfile_meta_if_necessary(); + + std::vector, int64_t>> + meta_index_entry_list; + bool is_aligned = false; + TimeseriesIndex* time_timeseries_index = nullptr; + + ASSERT(start_offset < end_offset); + const int32_t read_size = end_offset - start_offset; + int32_t ret_read_len = 0; + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { + return E_OOM; + } + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto top_node = std::shared_ptr(top_node_ptr, + MetaIndexNode::self_deleter); + if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, + ret_read_len))) { + return ret; + } + if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) { + return ret; + } + + is_aligned = is_aligned_device(top_node); + if (is_aligned) { + if (RET_FAIL( + get_time_column_metadata(top_node, time_timeseries_index, pa))) { + return ret; + } + } + + get_all_leaf(top_node, meta_index_entry_list); + + if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa, + timeseries_indexs))) { + return ret; + } + + if (is_aligned && time_timeseries_index != nullptr) { + for (size_t i = 0; i < timeseries_indexs.size(); i++) { + void* buf = pa.alloc(sizeof(AlignedTimeseriesIndex)); + if (IS_NULL(buf)) { + return E_OOM; + } + auto* aligned_ts_idx = new (buf) AlignedTimeseriesIndex; + aligned_ts_idx->time_ts_idx_ = time_timeseries_index; + aligned_ts_idx->value_ts_idx_ = + dynamic_cast(timeseries_indexs[i]); + if (aligned_ts_idx->value_ts_idx_ == nullptr) { + return E_TYPE_NOT_MATCH; + } + timeseries_indexs[i] = aligned_ts_idx; + } + } + return ret; +} + bool TsFileIOReader::filter_stasify(ITimeseriesIndex* ts_index, Filter* time_filter) { ASSERT(ts_index->get_statistic() != nullptr); diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index 2f4135e0e..80701a2c8 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -84,6 +84,11 @@ class TsFileIOReader { std::vector& timeseries_indexs, common::PageArena& pa); + int get_device_timeseries_meta_by_offset( + int64_t start_offset, int64_t end_offset, + std::vector& timeseries_indexs, + common::PageArena& pa); + private: FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); } diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index cabf02b08..9e50f39ba 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -25,6 +25,69 @@ using namespace common; using namespace storage; +namespace { + +struct DeviceMetaEntry { + std::shared_ptr device_id; + int64_t start_offset; + int64_t end_offset; +}; + +int get_all_device_entries(std::vector& entries, + std::shared_ptr index_node, + ReadFile* read_file, PageArena& pa) { + int ret = E_OK; + if (index_node == nullptr) { + return ret; + } + if (index_node->node_type_ == LEAF_DEVICE) { + for (size_t i = 0; i < index_node->children_.size(); i++) { + DeviceMetaEntry entry; + entry.device_id = index_node->children_[i]->get_device_id(); + entry.start_offset = index_node->children_[i]->get_offset(); + entry.end_offset = + (i + 1 < index_node->children_.size()) + ? index_node->children_[i + 1]->get_offset() + : index_node->end_offset_; + entries.push_back(entry); + } + } else { + for (size_t idx = 0; idx < index_node->children_.size(); idx++) { + auto meta_index_entry = index_node->children_[idx]; + int start_offset = meta_index_entry->get_offset(); + int end_offset = index_node->end_offset_; + if (idx + 1 < index_node->children_.size()) { + end_offset = index_node->children_[idx + 1]->get_offset(); + } + ASSERT(end_offset - start_offset > 0); + const int32_t read_size = (int32_t)end_offset - start_offset; + int32_t ret_read_len = 0; + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { + return E_OOM; + } + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto top_node = std::shared_ptr( + top_node_ptr, [](MetaIndexNode* ptr) { + if (ptr) { + ptr->~MetaIndexNode(); + } + }); + if (RET_FAIL(read_file->read(start_offset, data_buf, read_size, + ret_read_len))) { + } else if (RET_FAIL(top_node->device_deserialize_from( + data_buf, read_size))) { + } else { + ret = get_all_device_entries(entries, top_node, read_file, pa); + } + } + } + return ret; +} + +} // namespace + namespace storage { TsFileReader::TsFileReader() : read_file_(nullptr), @@ -367,8 +430,6 @@ int TsFileReader::get_timeseries_metadata_impl( std::vector>& result) { int ret = E_OK; std::vector timeseries_indexs; - tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER); - // Pointers are owned by tsfile_reader_meta_pa_; shared_ptr must not delete auto noop_deleter = [](ITimeseriesIndex*) {}; if (RET_FAIL( tsfile_executor_->get_tsfile_io_reader() @@ -397,13 +458,33 @@ DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata( } DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata() { - // Collect metadata for all devices present in the file DeviceTimeseriesMetadataMap result; - auto device_ids = get_all_device_ids(); - for (const auto& device_id : device_ids) { - std::vector> list; - if (get_timeseries_metadata_impl(device_id, list) == E_OK) { - result.insert(std::make_pair(device_id, std::move(list))); + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return result; + } + + PageArena pa; + pa.init(512, MOD_TSFILE_READER); + std::vector entries; + for (auto& table_entry : tsfile_meta->table_metadata_index_node_map_) { + get_all_device_entries(entries, table_entry.second, read_file_, pa); + } + + auto noop_deleter = [](ITimeseriesIndex*) {}; + for (auto& device_entry : entries) { + std::vector raw_ts_indexes; + if (tsfile_executor_->get_tsfile_io_reader() + ->get_device_timeseries_meta_by_offset( + device_entry.start_offset, device_entry.end_offset, + raw_ts_indexes, tsfile_reader_meta_pa_) == E_OK) { + std::vector> list; + for (auto ts_idx : raw_ts_indexes) { + list.emplace_back( + std::shared_ptr(ts_idx, noop_deleter)); + } + result.insert( + std::make_pair(device_entry.device_id, std::move(list))); } } return result; From 2a643d44451183c9363d707d28d25b0a3604c94a Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sat, 9 May 2026 20:25:05 +0800 Subject: [PATCH 02/10] fix int64_t truncation and error handling in get_all_device_entries --- cpp/src/file/tsfile_io_reader.cc | 4 ++-- cpp/src/reader/tsfile_reader.cc | 25 +++++++++++++++---------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 61a75463d..296556c15 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -187,8 +187,8 @@ int TsFileIOReader::get_device_timeseries_meta_by_offset( is_aligned = is_aligned_device(top_node); if (is_aligned) { - if (RET_FAIL( - get_time_column_metadata(top_node, time_timeseries_index, pa))) { + if (RET_FAIL(get_time_column_metadata(top_node, time_timeseries_index, + pa))) { return ret; } } diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 9e50f39ba..8d9d9b5dc 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -45,22 +45,21 @@ int get_all_device_entries(std::vector& entries, DeviceMetaEntry entry; entry.device_id = index_node->children_[i]->get_device_id(); entry.start_offset = index_node->children_[i]->get_offset(); - entry.end_offset = - (i + 1 < index_node->children_.size()) - ? index_node->children_[i + 1]->get_offset() - : index_node->end_offset_; + entry.end_offset = (i + 1 < index_node->children_.size()) + ? index_node->children_[i + 1]->get_offset() + : index_node->end_offset_; entries.push_back(entry); } } else { for (size_t idx = 0; idx < index_node->children_.size(); idx++) { auto meta_index_entry = index_node->children_[idx]; - int start_offset = meta_index_entry->get_offset(); - int end_offset = index_node->end_offset_; + int64_t start_offset = meta_index_entry->get_offset(); + int64_t end_offset = index_node->end_offset_; if (idx + 1 < index_node->children_.size()) { end_offset = index_node->children_[idx + 1]->get_offset(); } ASSERT(end_offset - start_offset > 0); - const int32_t read_size = (int32_t)end_offset - start_offset; + const int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; char* data_buf = (char*)pa.alloc(read_size); void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); @@ -76,11 +75,14 @@ int get_all_device_entries(std::vector& entries, }); if (RET_FAIL(read_file->read(start_offset, data_buf, read_size, ret_read_len))) { - } else if (RET_FAIL(top_node->device_deserialize_from( - data_buf, read_size))) { + } else if (RET_FAIL(top_node->device_deserialize_from(data_buf, + read_size))) { } else { ret = get_all_device_entries(entries, top_node, read_file, pa); } + if (ret != E_OK) { + return ret; + } } } return ret; @@ -468,7 +470,10 @@ DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata() { pa.init(512, MOD_TSFILE_READER); std::vector entries; for (auto& table_entry : tsfile_meta->table_metadata_index_node_map_) { - get_all_device_entries(entries, table_entry.second, read_file_, pa); + if (get_all_device_entries(entries, table_entry.second, read_file_, + pa) != E_OK) { + return result; + } } auto noop_deleter = [](ITimeseriesIndex*) {}; From f23b59d17ebb97bd08f7a7e90c7bd5aaf7dfb935 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 08:56:01 +0800 Subject: [PATCH 03/10] optimize queryByRow with exact tag filter via B-tree direct lookup When DeviceMetaIterator receives a single TagEq filter (exact match on one tag column), construct the target device ID and use load_device_index_entry() for O(log N) B-tree binary search instead of traversing all internal nodes and scanning all leaf devices. For ecg_dataset (53040 devices), this reduces queryByRow's DeviceTaskIterator::next() from ~117ms to ~0.5ms per query (~230x). Also adds bench_read.cpp for standalone C++ read path benchmarking and makes load_device_index_entry() public for reuse. Co-Authored-By: Claude Opus 4.7 (1M context) --- cpp/examples/CMakeLists.txt | 5 +- cpp/examples/bench_read.cpp | 215 +++++++++++++++++++++++++ cpp/src/file/tsfile_io_reader.h | 10 +- cpp/src/reader/device_meta_iterator.cc | 77 ++++++++- cpp/src/reader/device_meta_iterator.h | 17 +- 5 files changed, 312 insertions(+), 12 deletions(-) create mode 100644 cpp/examples/bench_read.cpp diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index ebe6c66c8..e7895cb49 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -45,4 +45,7 @@ add_subdirectory(c_examples) add_executable(examples examples.cc) target_link_libraries(examples cpp_examples_obj c_examples_obj) -target_link_libraries(examples tsfile) \ No newline at end of file +target_link_libraries(examples tsfile) + +add_executable(bench_read bench_read.cpp) +target_link_libraries(bench_read tsfile) \ No newline at end of file diff --git a/cpp/examples/bench_read.cpp b/cpp/examples/bench_read.cpp new file mode 100644 index 000000000..9cf423e39 --- /dev/null +++ b/cpp/examples/bench_read.cpp @@ -0,0 +1,215 @@ +#include +#include +#include +#include +#include +#include + +#include "reader/tsfile_reader.h" +#include "common/tsfile_common.h" +#include "reader/filter/tag_filter.h" + +using Clock = std::chrono::high_resolution_clock; +using Ms = std::chrono::duration; + +static double to_ms(Clock::time_point a, Clock::time_point b) { + return std::chrono::duration_cast(b - a).count(); +} + +int main(int argc, char* argv[]) { + const char* file_path = + "/Volumes/timecho-yuan/data/timeBench/TimeBench_TsFile/" + "ecg_dataset/part_0.tsfile"; + if (argc > 1) file_path = argv[1]; + + const int WARMUP = 20; + const int BENCH = 100; + const int OFFSET = 1000; + const int LIMIT = 3584; + + storage::libtsfile_init(); + std::cout << "=== C++ TsFile Read Benchmark ===" << std::endl; + std::cout << "File: " << file_path << std::endl; + + // ---- Phase 1: Open reader ---- + auto t0 = Clock::now(); + storage::TsFileReader reader; + int ret = reader.open(file_path); + auto t1 = Clock::now(); + if (ret != 0) { + std::cerr << "Failed to open file, ret=" << ret << std::endl; + return 1; + } + std::cout << "\n[1] reader.open(): " << to_ms(t0, t1) << " ms" << std::endl; + + // ---- Phase 2: get_timeseries_metadata ---- + auto t2 = Clock::now(); + auto metadata_map = reader.get_timeseries_metadata(); + auto t3 = Clock::now(); + std::cout << "[2] get_timeseries_metadata(): " << to_ms(t2, t3) + << " ms (" << metadata_map.size() << " devices)" << std::endl; + + // ---- Phase 3: get_all_table_schemas ---- + auto t4 = Clock::now(); + auto schemas = reader.get_all_table_schemas(); + auto t5 = Clock::now(); + std::cout << "[3] get_all_table_schemas(): " << to_ms(t4, t5) + << " ms (" << schemas.size() << " tables)" << std::endl; + + // Pick first table and first device for queryByRow + if (schemas.empty()) { + std::cerr << "No tables found" << std::endl; + return 1; + } + auto table_schema = schemas[0]; + std::string table_name = table_schema->get_table_name(); + std::cout << "\nUsing table: " << table_name << std::endl; + + // Get field columns (non-tag, non-time) + std::vector field_columns; + auto measurement_names = table_schema->get_measurement_names(); + auto categories = table_schema->get_column_categories(); + std::vector tag_columns; + for (size_t i = 0; i < measurement_names.size(); i++) { + if (categories[i] == common::ColumnCategory::FIELD) { + field_columns.push_back(measurement_names[i]); + } else if (categories[i] == common::ColumnCategory::TAG) { + tag_columns.push_back(measurement_names[i]); + } + } + std::cout << "Field columns: " << field_columns.size() + << ", Tag columns: " << tag_columns.size() << std::endl; + + // Pick first device to build tag filter + auto device_ids = reader.get_all_devices(table_name); + if (device_ids.empty()) { + std::cerr << "No devices found" << std::endl; + return 1; + } + std::cout << "Total devices: " << device_ids.size() << std::endl; + + // Debug: print first device's segments + { + auto& d = device_ids[0]; + auto& segs = d->get_segments(); + std::cout << "First device segments (" << segs.size() << "): "; + for (size_t i = 0; i < segs.size(); i++) { + std::cout << "[" << i << "]=\"" << (segs[i] ? *segs[i] : "null") << "\" "; + } + std::cout << std::endl; + std::cout << "First device name: " << d->get_device_name() << std::endl; + std::cout << "First device table: " << d->get_table_name() << std::endl; + } + + // Use only first field column for benchmark (like Python does) + std::vector query_columns; + if (!field_columns.empty()) { + query_columns.push_back(field_columns[0]); + } + std::cout << "Query column: " << query_columns[0] << std::endl; + std::cout << "Offset: " << OFFSET << ", Limit: " << LIMIT << std::endl; + + // ---- Phase 4: Benchmark queryByRow with detail timing ---- + // Build tag filter for first device + auto& first_device = device_ids[0]; + first_device->split_table_name(); + + std::cout << "\n=== queryByRow Benchmark ===" << std::endl; + + // Stats accumulators + double total_build_filter = 0, total_query_create = 0; + double total_first_next = 0, total_remaining_next = 0; + double total_close = 0; + int total_rows = 0; + + for (int iter = 0; iter < WARMUP + BENCH; iter++) { + // Pick a device (round-robin for variety) + auto& device = device_ids[iter % device_ids.size()]; + device->split_table_name(); + + // 4a: Build tag filter + auto tf0 = Clock::now(); + storage::Filter* tag_filter = nullptr; + { + auto ts = reader.get_table_schema(table_name); + storage::TagFilterBuilder builder(ts.get()); + storage::Filter* combined = nullptr; + for (size_t i = 0; i < tag_columns.size(); i++) { + int seg_idx = i + 1; + std::string* seg = device->get_split_segname_at(seg_idx); + if (seg == nullptr) continue; + auto* eq = builder.eq(tag_columns[i], *seg); + if (combined == nullptr) { + combined = eq; + } else { + combined = builder.and_filter(combined, eq); + } + } + tag_filter = combined; + } + auto tf1 = Clock::now(); + + // 4b: Create query (ResultSet) + auto tq0 = Clock::now(); + storage::ResultSet* result_set = nullptr; + ret = reader.queryByRow(table_name, query_columns, OFFSET, LIMIT, + result_set, tag_filter); + auto tq1 = Clock::now(); + if (ret != 0 || result_set == nullptr) { + if (tag_filter) delete tag_filter; + continue; + } + + // 4c: First next() call (triggers lazy init) + auto tn0 = Clock::now(); + bool has_next = false; + ret = result_set->next(has_next); + auto tn1 = Clock::now(); + int row_count = has_next ? 1 : 0; + + // 4d: Remaining next() calls + auto tr0 = Clock::now(); + while (ret == 0 && has_next) { + ret = result_set->next(has_next); + if (has_next) row_count++; + } + auto tr1 = Clock::now(); + + // 4e: Close + auto tc0 = Clock::now(); + result_set->close(); + reader.destroy_query_data_set(result_set); + auto tc1 = Clock::now(); + + if (iter >= WARMUP) { + total_build_filter += to_ms(tf0, tf1); + total_query_create += to_ms(tq0, tq1); + total_first_next += to_ms(tn0, tn1); + total_remaining_next += to_ms(tr0, tr1); + total_close += to_ms(tc0, tc1); + total_rows += row_count; + } + + if (iter == WARMUP) { + std::cout << "Warmup done (" << WARMUP << " iters). " + << "First bench iter: " << row_count << " rows" << std::endl; + } + } + + int N = BENCH; + std::cout << "\n=== Results (avg over " << N << " iterations) ===" << std::endl; + std::cout << " build_tag_filter: " << (total_build_filter / N) << " ms" << std::endl; + std::cout << " queryByRow create: " << (total_query_create / N) << " ms" << std::endl; + std::cout << " first next(): " << (total_first_next / N) << " ms" << std::endl; + std::cout << " remaining next(): " << (total_remaining_next / N) << " ms" + << " (avg " << (total_rows / N) << " rows)" << std::endl; + std::cout << " close+destroy: " << (total_close / N) << " ms" << std::endl; + std::cout << " ----- total: " + << ((total_build_filter + total_query_create + total_first_next + + total_remaining_next + total_close) / N) + << " ms" << std::endl; + + reader.close(); + std::cout << "\nDone." << std::endl; + return 0; +} diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index 80701a2c8..85443326f 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -89,6 +89,11 @@ class TsFileIOReader { std::vector& timeseries_indexs, common::PageArena& pa); + int load_device_index_entry( + std::shared_ptr target_name, + std::shared_ptr& device_index_entry, + int64_t& end_offset); + private: FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); } @@ -96,11 +101,6 @@ class TsFileIOReader { int load_tsfile_meta_if_necessary(); - int load_device_index_entry( - std::shared_ptr target_name, - std::shared_ptr& device_index_entry, - int64_t& end_offset); - int load_measurement_index_entry( const std::string& measurement_name, std::shared_ptr top_node, diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index a41a29e6c..d1177ff38 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -43,6 +43,16 @@ bool DeviceMetaIterator::has_next() { return true; } + if (direct_device_id_ != nullptr) { + if (direct_lookup_done_) { + return false; + } + if (load_results_direct() != common::E_OK) { + return false; + } + return !result_cache_.empty(); + } + if (load_results() != common::E_OK) { return false; } @@ -63,9 +73,6 @@ int DeviceMetaIterator::next( int DeviceMetaIterator::load_results() { int root_num = meta_index_nodes_.size(); while (!meta_index_nodes_.empty()) { - // To avoid ASan overflow. - // using `const auto&` creates a reference - // to a queue element that may become invalid. auto meta_data_index_node = meta_index_nodes_.front(); meta_index_nodes_.pop(); const auto& node_type = meta_data_index_node->node_type_; @@ -80,7 +87,6 @@ int DeviceMetaIterator::load_results() { meta_data_index_node->~MetaIndexNode(); } } - return common::E_OK; } @@ -135,4 +141,67 @@ int DeviceMetaIterator::load_internal_node(MetaIndexNode* meta_index_node) { } return ret; } + +void DeviceMetaIterator::try_setup_direct_lookup(MetaIndexNode* root_node) { + if (id_filter_ == nullptr) return; + + const auto* eq = dynamic_cast(id_filter_); + if (eq == nullptr) return; + + // For a single TagEq filter, we can construct the exact device ID. + // segments layout: [0]=table_name, [col_idx_]=tag_value + // We need the table name from the root node's first child. + if (root_node->children_.empty()) return; + + auto first_device = root_node->children_[0]->get_device_id(); + if (first_device == nullptr) return; + + std::string table_name = first_device->get_table_name(); + int num_segments = eq->col_idx_ + 1; + std::vector segs(num_segments); + segs[0] = table_name; + for (int i = 1; i < num_segments; i++) { + segs[i] = ""; + } + segs[eq->col_idx_] = eq->value_; + direct_device_id_ = std::make_shared(segs); + direct_root_node_ = root_node; +} + +int DeviceMetaIterator::load_results_direct() { + int ret = common::E_OK; + direct_lookup_done_ = true; + + if (direct_device_id_ == nullptr) { + return common::E_OK; + } + + auto device_comparable = std::make_shared(direct_device_id_); + + std::shared_ptr device_index_entry; + int64_t end_offset = 0; + + ret = io_reader_->load_device_index_entry( + device_comparable, device_index_entry, end_offset); + + if (ret != common::E_OK || device_index_entry == nullptr) { + return common::E_OK; + } + + int64_t start_offset = device_index_entry->get_offset(); + MetaIndexNode* child_node = nullptr; + if (RET_FAIL(io_reader_->read_device_meta_index( + start_offset, end_offset, pa_, child_node, true))) { + return ret; + } + + auto device_id = device_index_entry->get_device_id(); + if (should_split_device_name) { + device_id->split_table_name(); + } + result_cache_.push(std::make_pair(device_id, child_node)); + + return common::E_OK; +} + } // namespace storage \ No newline at end of file diff --git a/cpp/src/reader/device_meta_iterator.h b/cpp/src/reader/device_meta_iterator.h index 704098b4d..4873bdd91 100644 --- a/cpp/src/reader/device_meta_iterator.h +++ b/cpp/src/reader/device_meta_iterator.h @@ -21,6 +21,8 @@ #define READER_DEVICE_META_ITERATOR_H #include +#include +#include #include "file/tsfile_io_reader.h" #include "reader/expression.h" @@ -34,15 +36,18 @@ class DeviceMetaIterator { const Filter* id_filter) : io_reader_(io_reader), id_filter_(id_filter), - should_split_device_name(false) { + should_split_device_name(false), + direct_lookup_done_(false) { meta_index_nodes_.push(meat_index_node); pa_.init(512, common::MOD_DEVICE_META_ITER); + try_setup_direct_lookup(meat_index_node); } DeviceMetaIterator(TsFileIOReader* io_reader, const std::vector& meta_index_node_list, const Filter* id_filter) - : io_reader_(io_reader), id_filter_(id_filter) { + : io_reader_(io_reader), id_filter_(id_filter), + direct_lookup_done_(false) { for (auto meta_index_node : meta_index_node_list) { meta_index_nodes_.push(meta_index_node); } @@ -62,6 +67,10 @@ class DeviceMetaIterator { int load_results(); int load_leaf_device(MetaIndexNode* meta_index_node); int load_internal_node(MetaIndexNode* meta_index_node); + + void try_setup_direct_lookup(MetaIndexNode* root_node); + int load_results_direct(); + TsFileIOReader* io_reader_; std::queue meta_index_nodes_; std::queue, MetaIndexNode*>> @@ -69,6 +78,10 @@ class DeviceMetaIterator { const Filter* id_filter_; common::PageArena pa_; bool should_split_device_name; + + bool direct_lookup_done_; + std::shared_ptr direct_device_id_; + MetaIndexNode* direct_root_node_ = nullptr; }; } // end namespace storage From 44a3c450ee05c47f5809487723d82168fd37c7e5 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 08:59:13 +0800 Subject: [PATCH 04/10] style: clang-format Co-Authored-By: Claude Opus 4.7 (1M context) --- cpp/examples/bench_read.cpp | 48 +++++++++++++++----------- cpp/src/reader/device_meta_iterator.cc | 11 +++--- cpp/src/reader/device_meta_iterator.h | 3 +- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/cpp/examples/bench_read.cpp b/cpp/examples/bench_read.cpp index 9cf423e39..cfe975bfc 100644 --- a/cpp/examples/bench_read.cpp +++ b/cpp/examples/bench_read.cpp @@ -5,9 +5,9 @@ #include #include -#include "reader/tsfile_reader.h" #include "common/tsfile_common.h" #include "reader/filter/tag_filter.h" +#include "reader/tsfile_reader.h" using Clock = std::chrono::high_resolution_clock; using Ms = std::chrono::duration; @@ -23,9 +23,9 @@ int main(int argc, char* argv[]) { if (argc > 1) file_path = argv[1]; const int WARMUP = 20; - const int BENCH = 100; + const int BENCH = 100; const int OFFSET = 1000; - const int LIMIT = 3584; + const int LIMIT = 3584; storage::libtsfile_init(); std::cout << "=== C++ TsFile Read Benchmark ===" << std::endl; @@ -46,15 +46,15 @@ int main(int argc, char* argv[]) { auto t2 = Clock::now(); auto metadata_map = reader.get_timeseries_metadata(); auto t3 = Clock::now(); - std::cout << "[2] get_timeseries_metadata(): " << to_ms(t2, t3) - << " ms (" << metadata_map.size() << " devices)" << std::endl; + std::cout << "[2] get_timeseries_metadata(): " << to_ms(t2, t3) << " ms (" + << metadata_map.size() << " devices)" << std::endl; // ---- Phase 3: get_all_table_schemas ---- auto t4 = Clock::now(); auto schemas = reader.get_all_table_schemas(); auto t5 = Clock::now(); - std::cout << "[3] get_all_table_schemas(): " << to_ms(t4, t5) - << " ms (" << schemas.size() << " tables)" << std::endl; + std::cout << "[3] get_all_table_schemas(): " << to_ms(t4, t5) << " ms (" + << schemas.size() << " tables)" << std::endl; // Pick first table and first device for queryByRow if (schemas.empty()) { @@ -94,7 +94,8 @@ int main(int argc, char* argv[]) { auto& segs = d->get_segments(); std::cout << "First device segments (" << segs.size() << "): "; for (size_t i = 0; i < segs.size(); i++) { - std::cout << "[" << i << "]=\"" << (segs[i] ? *segs[i] : "null") << "\" "; + std::cout << "[" << i << "]=\"" << (segs[i] ? *segs[i] : "null") + << "\" "; } std::cout << std::endl; std::cout << "First device name: " << d->get_device_name() << std::endl; @@ -182,31 +183,38 @@ int main(int argc, char* argv[]) { auto tc1 = Clock::now(); if (iter >= WARMUP) { - total_build_filter += to_ms(tf0, tf1); - total_query_create += to_ms(tq0, tq1); - total_first_next += to_ms(tn0, tn1); + total_build_filter += to_ms(tf0, tf1); + total_query_create += to_ms(tq0, tq1); + total_first_next += to_ms(tn0, tn1); total_remaining_next += to_ms(tr0, tr1); - total_close += to_ms(tc0, tc1); - total_rows += row_count; + total_close += to_ms(tc0, tc1); + total_rows += row_count; } if (iter == WARMUP) { std::cout << "Warmup done (" << WARMUP << " iters). " - << "First bench iter: " << row_count << " rows" << std::endl; + << "First bench iter: " << row_count << " rows" + << std::endl; } } int N = BENCH; - std::cout << "\n=== Results (avg over " << N << " iterations) ===" << std::endl; - std::cout << " build_tag_filter: " << (total_build_filter / N) << " ms" << std::endl; - std::cout << " queryByRow create: " << (total_query_create / N) << " ms" << std::endl; - std::cout << " first next(): " << (total_first_next / N) << " ms" << std::endl; + std::cout << "\n=== Results (avg over " << N + << " iterations) ===" << std::endl; + std::cout << " build_tag_filter: " << (total_build_filter / N) << " ms" + << std::endl; + std::cout << " queryByRow create: " << (total_query_create / N) << " ms" + << std::endl; + std::cout << " first next(): " << (total_first_next / N) << " ms" + << std::endl; std::cout << " remaining next(): " << (total_remaining_next / N) << " ms" << " (avg " << (total_rows / N) << " rows)" << std::endl; - std::cout << " close+destroy: " << (total_close / N) << " ms" << std::endl; + std::cout << " close+destroy: " << (total_close / N) << " ms" + << std::endl; std::cout << " ----- total: " << ((total_build_filter + total_query_create + total_first_next + - total_remaining_next + total_close) / N) + total_remaining_next + total_close) / + N) << " ms" << std::endl; reader.close(); diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index d1177ff38..c76b434bd 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -176,13 +176,14 @@ int DeviceMetaIterator::load_results_direct() { return common::E_OK; } - auto device_comparable = std::make_shared(direct_device_id_); + auto device_comparable = + std::make_shared(direct_device_id_); std::shared_ptr device_index_entry; int64_t end_offset = 0; - ret = io_reader_->load_device_index_entry( - device_comparable, device_index_entry, end_offset); + ret = io_reader_->load_device_index_entry(device_comparable, + device_index_entry, end_offset); if (ret != common::E_OK || device_index_entry == nullptr) { return common::E_OK; @@ -190,8 +191,8 @@ int DeviceMetaIterator::load_results_direct() { int64_t start_offset = device_index_entry->get_offset(); MetaIndexNode* child_node = nullptr; - if (RET_FAIL(io_reader_->read_device_meta_index( - start_offset, end_offset, pa_, child_node, true))) { + if (RET_FAIL(io_reader_->read_device_meta_index(start_offset, end_offset, + pa_, child_node, true))) { return ret; } diff --git a/cpp/src/reader/device_meta_iterator.h b/cpp/src/reader/device_meta_iterator.h index 4873bdd91..da6a37dc4 100644 --- a/cpp/src/reader/device_meta_iterator.h +++ b/cpp/src/reader/device_meta_iterator.h @@ -46,7 +46,8 @@ class DeviceMetaIterator { DeviceMetaIterator(TsFileIOReader* io_reader, const std::vector& meta_index_node_list, const Filter* id_filter) - : io_reader_(io_reader), id_filter_(id_filter), + : io_reader_(io_reader), + id_filter_(id_filter), direct_lookup_done_(false) { for (auto meta_index_node : meta_index_node_list) { meta_index_nodes_.push(meta_index_node); From 4dfe90f9f1ec7ab46ad488efe2bad42811f1dba3 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 09:31:36 +0800 Subject: [PATCH 05/10] Updae C++ CLAUDE.md to persist format way --- cpp/CLAUDE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/CLAUDE.md b/cpp/CLAUDE.md index 00157dd5a..674771759 100644 --- a/cpp/CLAUDE.md +++ b/cpp/CLAUDE.md @@ -92,6 +92,7 @@ cpp/src/ ## Code Style - **Formatter**: clang-format (Google style), configured in `.clang-format` +- After modifying C++ code, run from the repo root to format: `./mvnw spotless:apply -P with-cpp` ## Testing From 8813a6b725dffa19b771208107478087d247ddac Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 09:41:04 +0800 Subject: [PATCH 06/10] fix direct lookup for multi-tag-column devices The TagEq direct B-tree lookup constructed a device ID with only col_idx_+1 segments, but devices with multiple tag columns have more. The segment count mismatch caused operator== to always return false, so queries returned 0 rows. Guard the optimization to only activate when the filter fully specifies the device ID (single tag column). --- cpp/src/reader/device_meta_iterator.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index c76b434bd..83ff4493a 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -148,19 +148,22 @@ void DeviceMetaIterator::try_setup_direct_lookup(MetaIndexNode* root_node) { const auto* eq = dynamic_cast(id_filter_); if (eq == nullptr) return; - // For a single TagEq filter, we can construct the exact device ID. - // segments layout: [0]=table_name, [col_idx_]=tag_value - // We need the table name from the root node's first child. if (root_node->children_.empty()) return; auto first_device = root_node->children_[0]->get_device_id(); if (first_device == nullptr) return; + auto first_segments = first_device->get_segments(); + int actual_segment_count = static_cast(first_segments.size()); + + // Only use direct lookup when the single TagEq filter fully specifies + // the device ID (exactly one tag column, so segments = [table_name, tag]). + if (actual_segment_count != eq->col_idx_ + 1) return; + std::string table_name = first_device->get_table_name(); - int num_segments = eq->col_idx_ + 1; - std::vector segs(num_segments); + std::vector segs(actual_segment_count); segs[0] = table_name; - for (int i = 1; i < num_segments; i++) { + for (int i = 1; i < actual_segment_count; i++) { segs[i] = ""; } segs[eq->col_idx_] = eq->value_; From d1b09be6fb84df5d3ef2e354b0427b0ad6a46aa1 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 09:50:55 +0800 Subject: [PATCH 07/10] add unit tests for timeseries metadata and direct B-tree lookup - GetTimeseriesMetadataTableModel: test get_timeseries_metadata() with table model, covering get_all_device_entries (LEAF_DEVICE path) and get_device_timeseries_meta_by_offset - GetTimeseriesMetadataMultiTable: test with two tables, covering the multi-table iteration in get_timeseries_metadata - DirectLookupSingleTagColumn: test the TagEq direct B-tree lookup optimization with a single-tag-column table, covering try_setup_direct_lookup and load_results_direct - DirectLookupNonExistDevice: test direct lookup returns 0 rows when the device does not exist --- .../table_view/tsfile_reader_table_test.cc | 260 ++++++++++++++++++ 1 file changed, 260 insertions(+) diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 1f63573e1..4cf73e56e 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -885,4 +885,264 @@ TEST_F(TsFileTableReaderTest, AlignedNullAtBlockBoundaryNoRowLoss) { ASSERT_EQ(nullable_rows, total_rows); ASSERT_EQ(reader.close(), common::E_OK); +} + +TEST_F(TsFileTableReaderTest, GetTimeseriesMetadataTableModel) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("device", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("value", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = new TableSchema("meta_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + int num_devices = 3; + int points = 10; + int total_rows = num_devices * points; + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), total_rows); + for (int d = 0; d < num_devices; d++) { + std::string dev = "dev" + std::to_string(d); + for (int t = 0; t < points; t++) { + int row = d * points + t; + tablet.add_timestamp(row, static_cast(t)); + tablet.add_value(row, "device", dev.c_str()); + tablet.add_value(row, "value", static_cast(d * 100 + t)); + } + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + auto meta_map = reader.get_timeseries_metadata(); + ASSERT_EQ(meta_map.size(), static_cast(num_devices)); + + for (auto& entry : meta_map) { + auto& ts_list = entry.second; + ASSERT_FALSE(ts_list.empty()); + for (auto& ts_idx : ts_list) { + ASSERT_NE(ts_idx->get_statistic(), nullptr); + ASSERT_EQ(ts_idx->get_statistic()->count_, points); + } + } + + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderTest, GetTimeseriesMetadataMultiTable) { + std::vector schemas0; + std::vector cats0; + schemas0.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats0.emplace_back(ColumnCategory::TAG); + schemas0.emplace_back(new MeasurementSchema("v0", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats0.emplace_back(ColumnCategory::FIELD); + auto* schema0 = new TableSchema("table_a", schemas0, cats0); + auto writer = std::make_shared(&write_file_, schema0); + + storage::Tablet tablet0( + schema0->get_table_name(), schema0->get_measurement_names(), + schema0->get_data_types(), schema0->get_column_categories(), 10); + for (int d = 0; d < 2; d++) { + std::string dev = "a_dev" + std::to_string(d); + for (int t = 0; t < 5; t++) { + int row = d * 5 + t; + tablet0.add_timestamp(row, static_cast(t)); + tablet0.add_value(row, "tag", dev.c_str()); + tablet0.add_value(row, "v0", static_cast(t)); + } + } + ASSERT_EQ(writer->write_table(tablet0), common::E_OK); + + std::vector schemas1; + std::vector cats1; + schemas1.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats1.emplace_back(ColumnCategory::TAG); + schemas1.emplace_back(new MeasurementSchema("v1", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + cats1.emplace_back(ColumnCategory::FIELD); + auto* schema1 = new TableSchema("table_b", schemas1, cats1); + auto schema1_ptr = std::shared_ptr(schema1); + writer->register_table(schema1_ptr); + + storage::Tablet tablet1( + schema1->get_table_name(), schema1->get_measurement_names(), + schema1->get_data_types(), schema1->get_column_categories(), 24); + for (int d = 0; d < 3; d++) { + std::string dev = "b_dev" + std::to_string(d); + for (int t = 0; t < 8; t++) { + int row = d * 8 + t; + tablet1.add_timestamp(row, static_cast(t)); + tablet1.add_value(row, "tag", dev.c_str()); + tablet1.add_value(row, "v1", static_cast(t)); + } + } + ASSERT_EQ(writer->write_table(tablet1), common::E_OK); + + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + auto meta_map = reader.get_timeseries_metadata(); + ASSERT_EQ(meta_map.size(), 5u); + + int table_a_count = 0; + int table_b_count = 0; + for (auto& entry : meta_map) { + auto table_name = entry.first->get_table_name(); + if (table_name == "table_a") { + table_a_count++; + for (auto& ts : entry.second) { + ASSERT_EQ(ts->get_statistic()->count_, 5); + } + } else if (table_name == "table_b") { + table_b_count++; + for (auto& ts : entry.second) { + ASSERT_EQ(ts->get_statistic()->count_, 8); + } + } + } + ASSERT_EQ(table_a_count, 2); + ASSERT_EQ(table_b_count, 3); + + ASSERT_EQ(reader.close(), common::E_OK); + delete schema0; +} + +TEST_F(TsFileTableReaderTest, DirectLookupSingleTagColumn) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("val", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = + new TableSchema("single_tag_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + int num_devices = 5; + int points = 10; + storage::Tablet tablet( + table_schema->get_table_name(), table_schema->get_measurement_names(), + table_schema->get_data_types(), table_schema->get_column_categories(), + num_devices * points); + for (int d = 0; d < num_devices; d++) { + std::string dev_name = "dev" + std::to_string(d); + for (int t = 0; t < points; t++) { + int row = d * points + t; + tablet.add_timestamp(row, static_cast(t)); + tablet.add_value(row, "tag", dev_name.c_str()); + tablet.add_value(row, "val", static_cast(d * 100 + t)); + } + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + ResultSet* tmp_result_set = nullptr; + Filter* tag_filter = TagFilterBuilder(table_schema).eq("tag", "dev2"); + std::vector cols = {"tag", "val"}; + int ret = reader.query("single_tag_table", cols, 0, 1000000, tmp_result_set, + tag_filter); + ASSERT_EQ(ret, common::E_OK); + auto* table_result_set = (TableResultSet*)tmp_result_set; + + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + ASSERT_EQ(table_result_set->get_value(1), row_num % points); + auto* tag_val = table_result_set->get_value(2); + std::string expected_tag = "dev2"; + ASSERT_EQ(std::string(tag_val->buf_, tag_val->len_), expected_tag); + ASSERT_EQ(table_result_set->get_value(3), + static_cast(200 + row_num)); + row_num++; + } + ASSERT_EQ(row_num, points); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; + delete tag_filter; +} + +TEST_F(TsFileTableReaderTest, DirectLookupNonExistDevice) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("tag", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("val", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = + new TableSchema("single_tag_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), 5); + for (int t = 0; t < 5; t++) { + tablet.add_timestamp(t, static_cast(t)); + tablet.add_value(t, "tag", "existing_dev"); + tablet.add_value(t, "val", static_cast(t)); + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + ResultSet* tmp_result_set = nullptr; + Filter* tag_filter = TagFilterBuilder(table_schema).eq("tag", "non_exist"); + std::vector cols = {"tag", "val"}; + int ret = reader.query("single_tag_table", cols, 0, 1000000, tmp_result_set, + tag_filter); + ASSERT_EQ(ret, common::E_OK); + auto* table_result_set = (TableResultSet*)tmp_result_set; + + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + row_num++; + } + ASSERT_EQ(row_num, 0); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; + delete tag_filter; } \ No newline at end of file From 08d0373ee65e1e453f7141565c87a6c34e722c2a Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 11:28:51 +0800 Subject: [PATCH 08/10] delete examples/bench_read.cpp --- cpp/examples/CMakeLists.txt | 5 +- cpp/examples/bench_read.cpp | 223 ------------------------------------ 2 files changed, 1 insertion(+), 227 deletions(-) delete mode 100644 cpp/examples/bench_read.cpp diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index e7895cb49..ebe6c66c8 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -45,7 +45,4 @@ add_subdirectory(c_examples) add_executable(examples examples.cc) target_link_libraries(examples cpp_examples_obj c_examples_obj) -target_link_libraries(examples tsfile) - -add_executable(bench_read bench_read.cpp) -target_link_libraries(bench_read tsfile) \ No newline at end of file +target_link_libraries(examples tsfile) \ No newline at end of file diff --git a/cpp/examples/bench_read.cpp b/cpp/examples/bench_read.cpp deleted file mode 100644 index cfe975bfc..000000000 --- a/cpp/examples/bench_read.cpp +++ /dev/null @@ -1,223 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "common/tsfile_common.h" -#include "reader/filter/tag_filter.h" -#include "reader/tsfile_reader.h" - -using Clock = std::chrono::high_resolution_clock; -using Ms = std::chrono::duration; - -static double to_ms(Clock::time_point a, Clock::time_point b) { - return std::chrono::duration_cast(b - a).count(); -} - -int main(int argc, char* argv[]) { - const char* file_path = - "/Volumes/timecho-yuan/data/timeBench/TimeBench_TsFile/" - "ecg_dataset/part_0.tsfile"; - if (argc > 1) file_path = argv[1]; - - const int WARMUP = 20; - const int BENCH = 100; - const int OFFSET = 1000; - const int LIMIT = 3584; - - storage::libtsfile_init(); - std::cout << "=== C++ TsFile Read Benchmark ===" << std::endl; - std::cout << "File: " << file_path << std::endl; - - // ---- Phase 1: Open reader ---- - auto t0 = Clock::now(); - storage::TsFileReader reader; - int ret = reader.open(file_path); - auto t1 = Clock::now(); - if (ret != 0) { - std::cerr << "Failed to open file, ret=" << ret << std::endl; - return 1; - } - std::cout << "\n[1] reader.open(): " << to_ms(t0, t1) << " ms" << std::endl; - - // ---- Phase 2: get_timeseries_metadata ---- - auto t2 = Clock::now(); - auto metadata_map = reader.get_timeseries_metadata(); - auto t3 = Clock::now(); - std::cout << "[2] get_timeseries_metadata(): " << to_ms(t2, t3) << " ms (" - << metadata_map.size() << " devices)" << std::endl; - - // ---- Phase 3: get_all_table_schemas ---- - auto t4 = Clock::now(); - auto schemas = reader.get_all_table_schemas(); - auto t5 = Clock::now(); - std::cout << "[3] get_all_table_schemas(): " << to_ms(t4, t5) << " ms (" - << schemas.size() << " tables)" << std::endl; - - // Pick first table and first device for queryByRow - if (schemas.empty()) { - std::cerr << "No tables found" << std::endl; - return 1; - } - auto table_schema = schemas[0]; - std::string table_name = table_schema->get_table_name(); - std::cout << "\nUsing table: " << table_name << std::endl; - - // Get field columns (non-tag, non-time) - std::vector field_columns; - auto measurement_names = table_schema->get_measurement_names(); - auto categories = table_schema->get_column_categories(); - std::vector tag_columns; - for (size_t i = 0; i < measurement_names.size(); i++) { - if (categories[i] == common::ColumnCategory::FIELD) { - field_columns.push_back(measurement_names[i]); - } else if (categories[i] == common::ColumnCategory::TAG) { - tag_columns.push_back(measurement_names[i]); - } - } - std::cout << "Field columns: " << field_columns.size() - << ", Tag columns: " << tag_columns.size() << std::endl; - - // Pick first device to build tag filter - auto device_ids = reader.get_all_devices(table_name); - if (device_ids.empty()) { - std::cerr << "No devices found" << std::endl; - return 1; - } - std::cout << "Total devices: " << device_ids.size() << std::endl; - - // Debug: print first device's segments - { - auto& d = device_ids[0]; - auto& segs = d->get_segments(); - std::cout << "First device segments (" << segs.size() << "): "; - for (size_t i = 0; i < segs.size(); i++) { - std::cout << "[" << i << "]=\"" << (segs[i] ? *segs[i] : "null") - << "\" "; - } - std::cout << std::endl; - std::cout << "First device name: " << d->get_device_name() << std::endl; - std::cout << "First device table: " << d->get_table_name() << std::endl; - } - - // Use only first field column for benchmark (like Python does) - std::vector query_columns; - if (!field_columns.empty()) { - query_columns.push_back(field_columns[0]); - } - std::cout << "Query column: " << query_columns[0] << std::endl; - std::cout << "Offset: " << OFFSET << ", Limit: " << LIMIT << std::endl; - - // ---- Phase 4: Benchmark queryByRow with detail timing ---- - // Build tag filter for first device - auto& first_device = device_ids[0]; - first_device->split_table_name(); - - std::cout << "\n=== queryByRow Benchmark ===" << std::endl; - - // Stats accumulators - double total_build_filter = 0, total_query_create = 0; - double total_first_next = 0, total_remaining_next = 0; - double total_close = 0; - int total_rows = 0; - - for (int iter = 0; iter < WARMUP + BENCH; iter++) { - // Pick a device (round-robin for variety) - auto& device = device_ids[iter % device_ids.size()]; - device->split_table_name(); - - // 4a: Build tag filter - auto tf0 = Clock::now(); - storage::Filter* tag_filter = nullptr; - { - auto ts = reader.get_table_schema(table_name); - storage::TagFilterBuilder builder(ts.get()); - storage::Filter* combined = nullptr; - for (size_t i = 0; i < tag_columns.size(); i++) { - int seg_idx = i + 1; - std::string* seg = device->get_split_segname_at(seg_idx); - if (seg == nullptr) continue; - auto* eq = builder.eq(tag_columns[i], *seg); - if (combined == nullptr) { - combined = eq; - } else { - combined = builder.and_filter(combined, eq); - } - } - tag_filter = combined; - } - auto tf1 = Clock::now(); - - // 4b: Create query (ResultSet) - auto tq0 = Clock::now(); - storage::ResultSet* result_set = nullptr; - ret = reader.queryByRow(table_name, query_columns, OFFSET, LIMIT, - result_set, tag_filter); - auto tq1 = Clock::now(); - if (ret != 0 || result_set == nullptr) { - if (tag_filter) delete tag_filter; - continue; - } - - // 4c: First next() call (triggers lazy init) - auto tn0 = Clock::now(); - bool has_next = false; - ret = result_set->next(has_next); - auto tn1 = Clock::now(); - int row_count = has_next ? 1 : 0; - - // 4d: Remaining next() calls - auto tr0 = Clock::now(); - while (ret == 0 && has_next) { - ret = result_set->next(has_next); - if (has_next) row_count++; - } - auto tr1 = Clock::now(); - - // 4e: Close - auto tc0 = Clock::now(); - result_set->close(); - reader.destroy_query_data_set(result_set); - auto tc1 = Clock::now(); - - if (iter >= WARMUP) { - total_build_filter += to_ms(tf0, tf1); - total_query_create += to_ms(tq0, tq1); - total_first_next += to_ms(tn0, tn1); - total_remaining_next += to_ms(tr0, tr1); - total_close += to_ms(tc0, tc1); - total_rows += row_count; - } - - if (iter == WARMUP) { - std::cout << "Warmup done (" << WARMUP << " iters). " - << "First bench iter: " << row_count << " rows" - << std::endl; - } - } - - int N = BENCH; - std::cout << "\n=== Results (avg over " << N - << " iterations) ===" << std::endl; - std::cout << " build_tag_filter: " << (total_build_filter / N) << " ms" - << std::endl; - std::cout << " queryByRow create: " << (total_query_create / N) << " ms" - << std::endl; - std::cout << " first next(): " << (total_first_next / N) << " ms" - << std::endl; - std::cout << " remaining next(): " << (total_remaining_next / N) << " ms" - << " (avg " << (total_rows / N) << " rows)" << std::endl; - std::cout << " close+destroy: " << (total_close / N) << " ms" - << std::endl; - std::cout << " ----- total: " - << ((total_build_filter + total_query_create + total_first_next + - total_remaining_next + total_close) / - N) - << " ms" << std::endl; - - reader.close(); - std::cout << "\nDone." << std::endl; - return 0; -} From 725642e77ff48c9895322ed0c1c55b02bfc84719 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 15:22:49 +0800 Subject: [PATCH 09/10] fix direct lookup guard for multi-tag-column tables The previous guard (actual_segment_count != eq->col_idx_ + 1) still allowed direct lookup when filtering on the last tag column in a multi-tag table (e.g., 2 tags: segment_count=3, col_idx=2, 2+1=3). The constructed device ID had empty segments for unfiltered tags, causing B-tree lookup to find nothing. Restrict direct lookup to single-tag tables only (segment_count == 2). --- cpp/src/reader/device_meta_iterator.cc | 4 +- .../table_view/tsfile_reader_table_test.cc | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index 83ff4493a..bf01b23a5 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -156,9 +156,7 @@ void DeviceMetaIterator::try_setup_direct_lookup(MetaIndexNode* root_node) { auto first_segments = first_device->get_segments(); int actual_segment_count = static_cast(first_segments.size()); - // Only use direct lookup when the single TagEq filter fully specifies - // the device ID (exactly one tag column, so segments = [table_name, tag]). - if (actual_segment_count != eq->col_idx_ + 1) return; + if (actual_segment_count != 2) return; std::string table_name = first_device->get_table_name(); std::vector segs(actual_segment_count); diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 4cf73e56e..21744b5eb 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -1141,6 +1141,83 @@ TEST_F(TsFileTableReaderTest, DirectLookupNonExistDevice) { } ASSERT_EQ(row_num, 0); + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; + delete tag_filter; +} + +TEST_F(TsFileTableReaderTest, MultiTagColumnFilterOnSecondTag) { + std::vector schemas; + std::vector categories; + schemas.emplace_back(new MeasurementSchema("region", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("device", TSDataType::STRING, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::TAG); + schemas.emplace_back(new MeasurementSchema("val", TSDataType::INT64, + TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + categories.emplace_back(ColumnCategory::FIELD); + auto* table_schema = + new TableSchema("multi_tag_table", schemas, categories); + auto writer = + std::make_shared(&write_file_, table_schema); + + struct DeviceData { + std::string region; + std::string device; + int start; + int count; + }; + std::vector devices = { + {"north", "dev_a", 0, 5}, + {"north", "dev_b", 5, 5}, + {"south", "dev_c", 10, 5}, + {"east", "dev_d", 15, 5}, + }; + + int total = 20; + storage::Tablet tablet( + table_schema->get_table_name(), table_schema->get_measurement_names(), + table_schema->get_data_types(), table_schema->get_column_categories(), + total); + int row = 0; + for (auto& d : devices) { + for (int t = 0; t < d.count; t++) { + tablet.add_timestamp(row, static_cast(d.start + t)); + tablet.add_value(row, "region", d.region.c_str()); + tablet.add_value(row, "device", d.device.c_str()); + tablet.add_value(row, "val", static_cast(d.start + t)); + row++; + } + } + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + ResultSet* tmp_result_set = nullptr; + Filter* tag_filter = + TagFilterBuilder(table_schema).eq("device", "dev_c"); + std::vector cols = {"region", "device", "val"}; + int ret = reader.query("multi_tag_table", cols, 0, 1000000, + tmp_result_set, tag_filter); + ASSERT_EQ(ret, common::E_OK); + auto* table_result_set = (TableResultSet*)tmp_result_set; + + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + row_num++; + } + ASSERT_EQ(row_num, 5); + reader.destroy_query_data_set(table_result_set); ASSERT_EQ(reader.close(), common::E_OK); delete table_schema; From 4b34bf507f780349c994e64f2e84e691ca337b7f Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Sun, 10 May 2026 15:27:27 +0800 Subject: [PATCH 10/10] style: clang-format --- .../reader/table_view/tsfile_reader_table_test.cc | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 21744b5eb..e55f34c2a 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -1181,10 +1181,10 @@ TEST_F(TsFileTableReaderTest, MultiTagColumnFilterOnSecondTag) { }; int total = 20; - storage::Tablet tablet( - table_schema->get_table_name(), table_schema->get_measurement_names(), - table_schema->get_data_types(), table_schema->get_column_categories(), - total); + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), total); int row = 0; for (auto& d : devices) { for (int t = 0; t < d.count; t++) { @@ -1203,11 +1203,10 @@ TEST_F(TsFileTableReaderTest, MultiTagColumnFilterOnSecondTag) { ASSERT_EQ(reader.open(file_name_), common::E_OK); ResultSet* tmp_result_set = nullptr; - Filter* tag_filter = - TagFilterBuilder(table_schema).eq("device", "dev_c"); + Filter* tag_filter = TagFilterBuilder(table_schema).eq("device", "dev_c"); std::vector cols = {"region", "device", "val"}; - int ret = reader.query("multi_tag_table", cols, 0, 1000000, - tmp_result_set, tag_filter); + int ret = reader.query("multi_tag_table", cols, 0, 1000000, tmp_result_set, + tag_filter); ASSERT_EQ(ret, common::E_OK); auto* table_result_set = (TableResultSet*)tmp_result_set;