Skip to content

Commit b8db982

Browse files
committed
tmp code.
1 parent b4b524d commit b8db982

File tree

5 files changed

+48
-54
lines changed

5 files changed

+48
-54
lines changed

cpp/src/common/allocator/byte_stream.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -305,14 +305,14 @@ class ByteStream {
305305
void clear_wrapped_buf() { wrapped_page_.buf_ = nullptr; }
306306

307307
/* ================ Part 1: basic ================ */
308-
FORCE_INLINE uint32_t remaining_size() const {
308+
FORCE_INLINE int64_t remaining_size() const {
309309
ASSERT(total_size_.load() >= read_pos_);
310310
return total_size_.load() - read_pos_;
311311
}
312312
FORCE_INLINE bool has_remaining() const { return remaining_size() > 0; }
313313

314314
FORCE_INLINE void mark_read_pos() { marked_read_pos_ = read_pos_; }
315-
FORCE_INLINE uint32_t get_mark_len() const {
315+
FORCE_INLINE int64_t get_mark_len() const {
316316
ASSERT(marked_read_pos_ <= read_pos_);
317317
return read_pos_ - marked_read_pos_;
318318
}
@@ -345,8 +345,8 @@ class ByteStream {
345345
this->total_size_.store(other.total_size_.load());
346346
}
347347

348-
FORCE_INLINE uint32_t total_size() const { return total_size_.load(); }
349-
FORCE_INLINE uint32_t read_pos() const { return read_pos_; };
348+
FORCE_INLINE int64_t total_size() const { return total_size_.load(); }
349+
FORCE_INLINE int64_t read_pos() const { return read_pos_; };
350350
FORCE_INLINE void wrapped_buf_advance_read_pos(uint32_t size) {
351351
if (size + read_pos_ > total_size_.load()) {
352352
read_pos_ = total_size_.load();
@@ -526,7 +526,7 @@ class ByteStream {
526526

527527
// get tail position <tail_, total_size_> atomically
528528
Page *host_end = nullptr;
529-
uint32_t host_total_size = 0;
529+
int64_t host_total_size = 0;
530530
while (true) {
531531
host_end = host_.tail_.load();
532532
host_total_size = host_.total_size_.load();
@@ -642,10 +642,10 @@ class ByteStream {
642642
OptionalAtomic<Page *> head_;
643643
OptionalAtomic<Page *> tail_;
644644
Page *read_page_; // only one thread is allow to reader this ByteStream
645-
OptionalAtomic<uint32_t> total_size_; // total size in byte
646-
uint32_t read_pos_; // current reader position
647-
uint32_t marked_read_pos_; // current reader position
648-
uint32_t page_size_;
645+
OptionalAtomic<int64_t> total_size_; // total size in byte
646+
int64_t read_pos_; // current reader position
647+
int64_t marked_read_pos_; // current reader position
648+
int64_t page_size_;
649649
AllocModID mid_;
650650
Page wrapped_page_;
651651
};

cpp/src/reader/aligned_chunk_reader.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ class AlignedChunkReader : public IChunkReader {
7676
int get_next_page(common::TsBlock *tsblock, Filter *oneshoot_filter,
7777
common::PageArena &pa) override;
7878

79+
bool should_skip(Filter *filter) override {
80+
if (filter != nullptr && time_chunk_meta_ != nullptr &&
81+
time_chunk_meta_->statistic_ != nullptr &&
82+
!filter->satisfy(time_chunk_meta_->statistic_)) {
83+
return true;
84+
}
85+
return false;
86+
}
87+
7988
private:
8089
FORCE_INLINE bool chunk_has_only_one_page(
8190
const ChunkHeader &chunk_header) const {

cpp/src/reader/ichunk_reader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class IChunkReader {
5353
}
5454

5555
virtual ChunkHeader &get_chunk_header() { return chunk_header_; }
56+
virtual bool should_skip(Filter* filter) { return false; }
5657

5758
protected:
5859
ChunkHeader chunk_header_;

cpp/src/reader/tsfile_series_scan_iterator.cc

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,31 +46,13 @@ int TsFileSeriesScanIterator::get_next(TsBlock *&ret_tsblock, bool alloc,
4646
ret_tsblock = alloc_tsblock();
4747
}
4848

49-
if (chunk_reader_->has_more_data()) {
50-
ChunkMeta* cm = nullptr;
51-
ChunkMeta* time_cm = nullptr;
52-
ChunkMeta* value_cm = nullptr;
53-
54-
if (!is_aligned_) {
55-
if (chunk_meta_cursor_ == nullptr) {
56-
cm = nullptr;
57-
} else {
58-
cm = get_current_chunk_meta();
59-
}
60-
} else {
61-
time_cm = time_chunk_meta_cursor_ == nullptr ? nullptr : time_chunk_meta_cursor_.get();
62-
cm = time_cm;
63-
}
64-
65-
if (filter != nullptr && cm != nullptr && cm->statistic_ != nullptr && !filter->satisfy(cm->statistic_)) {
66-
chunk_reader_->reset();
67-
}
49+
if (chunk_reader_->should_skip(filter)) {
50+
chunk_reader_->reset();
6851
}
6952

7053
while (true) {
7154
if (!chunk_reader_->has_more_data()) {
7255
while (true) {
73-
advance_to_next_chunk();
7456
if (!has_next_chunk()) {
7557
return E_NO_MORE_DATA;
7658
}
@@ -84,6 +66,7 @@ int TsFileSeriesScanIterator::get_next(TsBlock *&ret_tsblock, bool alloc,
8466
value_cm = value_chunk_meta_cursor_.get();
8567
cm = time_cm;
8668
}
69+
advance_to_next_chunk();
8770
if (filter != nullptr && cm->statistic_ != nullptr && !filter->satisfy(cm->statistic_)) {
8871
continue;
8972
}

cpp/test/writer/table_view/tsfile_writer_table_test.cc

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -169,31 +169,32 @@ TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) {
169169
// tsfile_table_writer->flush();
170170
// tsfile_table_writer->close();
171171

172-
TsFileReader reader = TsFileReader();
173-
reader.open("/Users/colin/dev/tsfile/cpp/timebench_0.tsfile");
174-
ResultSet* ret = nullptr;
175-
auto start = std::chrono::high_resolution_clock::now();
176-
int ret_value = reader.query("timebench", {"value"}, 717840000, 717840000+1024, ret);
177-
ASSERT_EQ(common::E_OK, ret_value);
178-
auto* table_result_set = (TableResultSet*)ret;
179-
bool has_next = false;
180-
int cur_line = 0;
181-
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
182-
cur_line++;
183-
}
184-
// 记录结束时间点
185-
auto end = std::chrono::high_resolution_clock::now();
186-
187-
// 计算耗时(微秒)
188-
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
189-
190-
// 输出结果
191-
std::cout << "耗时: " << duration.count() * 1.0 /1000/1000 << "" << std::endl;
192-
ASSERT_EQ(cur_line, 1025);
193-
table_result_set->close();
194-
reader.destroy_query_data_set(table_result_set);
195-
196-
reader.close();
172+
// TsFileReader reader = TsFileReader();
173+
// reader.open("/Users/colin/dev/tsfile/cpp/timebench_0.tsfile");
174+
// ResultSet* ret = nullptr;
175+
// auto start = std::chrono::high_resolution_clock::now();
176+
// int ret_value = reader.query("timebench", {"value"}, 0, 1+1024, ret);
177+
// ASSERT_EQ(common::E_OK, ret_value);
178+
// auto* table_result_set = (TableResultSet*)ret;
179+
// bool has_next = false;
180+
// int cur_line = 0;
181+
// while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
182+
// cur_line++;
183+
// std::cout<<table_result_set->get_value<int64_t>(1);
184+
// }
185+
// // 记录结束时间点
186+
// auto end = std::chrono::high_resolution_clock::now();
187+
//
188+
// // 计算耗时(微秒)
189+
// auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
190+
//
191+
// // 输出结果
192+
// std::cout << "耗时: " << duration.count() * 1.0 /1000/1000 << " 秒" << std::endl;
193+
// ASSERT_EQ(cur_line, 1026);
194+
// table_result_set->close();
195+
// reader.destroy_query_data_set(table_result_set);
196+
//
197+
// reader.close();
197198
}
198199

199200
TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {

0 commit comments

Comments
 (0)