2525using namespace common ;
2626namespace storage {
2727
28- int AlignedChunkReader::init (ReadFile * read_file, String m_name,
29- TSDataType data_type, Filter * time_filter) {
28+ int AlignedChunkReader::init (ReadFile* read_file, String m_name,
29+ TSDataType data_type, Filter* time_filter) {
3030 read_file_ = read_file;
3131 measurement_name_.shallow_copy_from (m_name);
3232 time_decoder_ = DecoderFactory::alloc_time_decoder ();
@@ -50,7 +50,7 @@ void AlignedChunkReader::reset() {
5050 cur_time_page_header_.reset ();
5151 cur_value_page_header_.reset ();
5252
53- char * file_data_buf = time_in_stream_.get_wrapped_buf ();
53+ char * file_data_buf = time_in_stream_.get_wrapped_buf ();
5454 if (file_data_buf != nullptr ) {
5555 mem_free (file_data_buf);
5656 }
@@ -87,7 +87,7 @@ void AlignedChunkReader::destroy() {
8787 CompressorFactory::free (value_compressor_);
8888 value_compressor_ = nullptr ;
8989 }
90- char * buf = time_in_stream_.get_wrapped_buf ();
90+ char * buf = time_in_stream_.get_wrapped_buf ();
9191 if (buf != nullptr ) {
9292 mem_free (buf);
9393 time_in_stream_.clear_wrapped_buf ();
@@ -102,8 +102,8 @@ void AlignedChunkReader::destroy() {
102102 chunk_header_.~ChunkHeader ();
103103}
104104
105- int AlignedChunkReader::load_by_aligned_meta (ChunkMeta * time_chunk_meta,
106- ChunkMeta * value_chunk_meta) {
105+ int AlignedChunkReader::load_by_aligned_meta (ChunkMeta* time_chunk_meta,
106+ ChunkMeta* value_chunk_meta) {
107107 int ret = E_OK;
108108 time_chunk_meta_ = time_chunk_meta;
109109 value_chunk_meta_ = value_chunk_meta;
@@ -116,8 +116,8 @@ int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
116116 file_data_time_buf_size_ = 1024 ;
117117 file_data_value_buf_size_ = 1024 ;
118118 int32_t ret_read_len = 0 ;
119- char * time_file_data_buf =
120- (char *)mem_alloc (file_data_time_buf_size_, MOD_CHUNK_READER);
119+ char * time_file_data_buf =
120+ (char *)mem_alloc (file_data_time_buf_size_, MOD_CHUNK_READER);
121121 if (IS_NULL (time_file_data_buf)) {
122122 return E_OOM;
123123 }
@@ -140,8 +140,8 @@ int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
140140 }
141141 /* ================ deserialize value_chunk_header ================*/
142142 ret_read_len = 0 ;
143- char * value_file_data_buf =
144- (char *)mem_alloc (file_data_value_buf_size_, MOD_CHUNK_READER);
143+ char * value_file_data_buf =
144+ (char *)mem_alloc (file_data_value_buf_size_, MOD_CHUNK_READER);
145145 if (IS_NULL (value_file_data_buf)) {
146146 return E_OOM;
147147 }
@@ -182,7 +182,7 @@ int AlignedChunkReader::load_by_aligned_meta(ChunkMeta *time_chunk_meta,
182182}
183183
184184int AlignedChunkReader::alloc_compressor_and_decoder (
185- storage::Decoder *& decoder, storage::Compressor *& compressor,
185+ storage::Decoder*& decoder, storage::Compressor*& compressor,
186186 TSEncoding encoding, TSDataType data_type, CompressionType compression) {
187187 if (decoder != nullptr ) {
188188 decoder->reset ();
@@ -204,10 +204,10 @@ int AlignedChunkReader::alloc_compressor_and_decoder(
204204 return E_OK;
205205}
206206
207- int AlignedChunkReader::get_next_page (TsBlock * ret_tsblock,
208- Filter * oneshoot_filter, PageArena & pa) {
207+ int AlignedChunkReader::get_next_page (TsBlock* ret_tsblock,
208+ Filter* oneshoot_filter, PageArena& pa) {
209209 int ret = E_OK;
210- Filter * filter =
210+ Filter* filter =
211211 (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
212212 if (prev_time_page_not_finish () && prev_value_page_not_finish ()) {
213213 ret = decode_time_value_buf_into_tsblock (ret_tsblock, oneshoot_filter,
@@ -243,11 +243,11 @@ int AlignedChunkReader::get_next_page(TsBlock *ret_tsblock,
243243 return ret;
244244}
245245
246- int AlignedChunkReader::get_cur_page_header (ChunkMeta *& chunk_meta,
247- common::ByteStream & in_stream,
248- PageHeader & cur_page_header,
249- uint32_t & chunk_visit_offset,
250- ChunkHeader & chunk_header) {
246+ int AlignedChunkReader::get_cur_page_header (ChunkMeta*& chunk_meta,
247+ common::ByteStream& in_stream,
248+ PageHeader& cur_page_header,
249+ uint32_t & chunk_visit_offset,
250+ ChunkHeader& chunk_header) {
251251 int ret = E_OK;
252252 bool retry = true ;
253253 int cur_page_header_serialized_size = 0 ;
@@ -263,7 +263,7 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta *&chunk_meta,
263263 if (deserialize_buf_not_enough (ret) && retry) {
264264 retry = false ;
265265 retry_read_want_size += 1024 ;
266- int32_t & file_data_buf_size =
266+ int32_t & file_data_buf_size =
267267 chunk_header.data_type_ == common::VECTOR
268268 ? file_data_time_buf_size_
269269 : file_data_value_buf_size_;
@@ -295,18 +295,18 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta *&chunk_meta,
295295// reader at least @want_size bytes from file and wrap the buffer into
296296// @in_stream_
297297int AlignedChunkReader::read_from_file_and_rewrap (
298- common::ByteStream & in_stream_, ChunkMeta *& chunk_meta,
299- uint32_t & chunk_visit_offset, int32_t & file_data_buf_size, int want_size,
298+ common::ByteStream& in_stream_, ChunkMeta*& chunk_meta,
299+ uint32_t & chunk_visit_offset, int32_t & file_data_buf_size, int want_size,
300300 bool may_shrink) {
301301 int ret = E_OK;
302302 const int DEFAULT_READ_SIZE = 4096 ; // may use page_size + page_header_size
303- char * file_data_buf = in_stream_.get_wrapped_buf ();
303+ char * file_data_buf = in_stream_.get_wrapped_buf ();
304304 int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
305305 int read_size =
306306 (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
307307 if (file_data_buf_size < read_size ||
308308 (may_shrink && read_size < file_data_buf_size / 10 )) {
309- file_data_buf = (char *)mem_realloc (file_data_buf, read_size);
309+ file_data_buf = (char *)mem_realloc (file_data_buf, read_size);
310310 if (IS_NULL (file_data_buf)) {
311311 return E_OOM;
312312 }
@@ -326,7 +326,7 @@ int AlignedChunkReader::read_from_file_and_rewrap(
326326 return ret;
327327}
328328
329- bool AlignedChunkReader::cur_page_statisify_filter (Filter * filter) {
329+ bool AlignedChunkReader::cur_page_statisify_filter (Filter* filter) {
330330 bool value_satisfy = filter == nullptr ||
331331 cur_value_page_header_.statistic_ == nullptr ||
332332 filter->satisfy (cur_value_page_header_.statistic_ );
@@ -364,8 +364,8 @@ int AlignedChunkReader::decode_cur_time_page_data() {
364364 }
365365 }
366366
367- char * time_compressed_buf = nullptr ;
368- char * time_uncompressed_buf = nullptr ;
367+ char * time_compressed_buf = nullptr ;
368+ char * time_uncompressed_buf = nullptr ;
369369 uint32_t time_compressed_buf_size = 0 ;
370370 uint32_t time_uncompressed_buf_size = 0 ;
371371
@@ -376,7 +376,7 @@ int AlignedChunkReader::decode_cur_time_page_data() {
376376#ifdef DEBUG_SE
377377 std::cout << " AlignedChunkReader::decode_cur_page_data,time_in_stream_."
378378 " get_wrapped_buf="
379- << (void *)(time_in_stream_.get_wrapped_buf ())
379+ << (void *)(time_in_stream_.get_wrapped_buf ())
380380 << " , time_in_stream_.read_pos=" << time_in_stream_.read_pos ()
381381 << std::endl;
382382#endif
@@ -427,11 +427,11 @@ int AlignedChunkReader::decode_cur_value_page_data() {
427427 }
428428 }
429429
430- char * value_compressed_buf = nullptr ;
431- char * value_uncompressed_buf = nullptr ;
430+ char * value_compressed_buf = nullptr ;
431+ char * value_uncompressed_buf = nullptr ;
432432 uint32_t value_compressed_buf_size = 0 ;
433433 uint32_t value_uncompressed_buf_size = 0 ;
434- char * value_buf = nullptr ;
434+ char * value_buf = nullptr ;
435435 uint32_t value_buf_size = 0 ;
436436
437437 // Step 2: do uncompress
@@ -467,7 +467,7 @@ int AlignedChunkReader::decode_cur_value_page_data() {
467467 SerializationUtil::read_ui32 (value_uncompressed_buf);
468468 value_uncompressed_buf_offset += sizeof (uint32_t );
469469 value_page_col_notnull_bitmap_.resize ((value_page_data_num_ + 7 ) / 8 );
470- for (unsigned char & i : value_page_col_notnull_bitmap_) {
470+ for (unsigned char & i : value_page_col_notnull_bitmap_) {
471471 i = *(value_uncompressed_buf + value_uncompressed_buf_offset);
472472 value_uncompressed_buf_offset++;
473473 }
@@ -486,7 +486,7 @@ int AlignedChunkReader::decode_cur_value_page_data() {
486486}
487487
488488int AlignedChunkReader::decode_time_value_buf_into_tsblock (
489- TsBlock *& ret_tsblock, Filter * filter, common::PageArena * pa) {
489+ TsBlock*& ret_tsblock, Filter* filter, common::PageArena* pa) {
490490 int ret = common::E_OK;
491491 ret = decode_tv_buf_into_tsblock_by_datatype (time_in_, value_in_,
492492 ret_tsblock, filter, pa);
@@ -535,7 +535,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
535535 ret = E_OVERFLOW; \
536536 break ; \
537537 } \
538- row_appender.append (0 , (char *)&time, sizeof (time)); \
538+ row_appender.append (0 , (char *)&time, sizeof (time)); \
539539 row_appender.append_null (1 ); \
540540 continue ; \
541541 } \
@@ -552,15 +552,15 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
552552 } else { \
553553 /* std::cout << "decoder: time=" << time << ", value=" << value \
554554 * << std::endl;*/ \
555- row_appender.append (0 , (char *)&time, sizeof (time)); \
556- row_appender.append (1 , (char *)&value, sizeof (value)); \
555+ row_appender.append (0 , (char *)&time, sizeof (time)); \
556+ row_appender.append (1 , (char *)&value, sizeof (value)); \
557557 } \
558558 } \
559559 } while (false )
560560
561561int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK (
562- ByteStream & time_in, ByteStream & value_in, RowAppender & row_appender,
563- Filter * filter) {
562+ ByteStream& time_in, ByteStream& value_in, RowAppender& row_appender,
563+ Filter* filter) {
564564 int ret = E_OK;
565565 uint32_t mask = 1 << 7 ;
566566 int64_t time = 0 ;
@@ -578,7 +578,7 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
578578 ret = E_OVERFLOW;
579579 break ;
580580 }
581- row_appender.append (0 , (char *)&time, sizeof (time));
581+ row_appender.append (0 , (char *)&time, sizeof (time));
582582 row_appender.append_null (1 );
583583 continue ;
584584 }
@@ -594,16 +594,16 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
594594 } else {
595595 /* std::cout << "decoder: time=" << time << ", value=" << value
596596 * << std::endl;*/
597- row_appender.append (0 , (char *)&time, sizeof (time));
598- row_appender.append (1 , (char *)&value, sizeof (value));
597+ row_appender.append (0 , (char *)&time, sizeof (time));
598+ row_appender.append (1 , (char *)&value, sizeof (value));
599599 }
600600 }
601601 return ret;
602602}
603603
604604int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype (
605- ByteStream & time_in, ByteStream & value_in, TsBlock * ret_tsblock,
606- Filter * filter, common::PageArena * pa) {
605+ ByteStream& time_in, ByteStream& value_in, TsBlock* ret_tsblock,
606+ Filter* filter, common::PageArena* pa) {
607607 int ret = E_OK;
608608 RowAppender row_appender (ret_tsblock);
609609 switch (value_chunk_header_.data_type_ ) {
@@ -648,24 +648,37 @@ int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
648648}
649649
650650int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK (
651- ByteStream & time_in, ByteStream & value_in, RowAppender & row_appender,
652- PageArena & pa, Filter * filter) {
651+ ByteStream& time_in, ByteStream& value_in, RowAppender& row_appender,
652+ PageArena& pa, Filter* filter) {
653653 int ret = E_OK;
654654 int64_t time = 0 ;
655655 common::String value;
656- while (time_decoder_->has_remaining (time_in)) {
657- ASSERT (value_decoder_->has_remaining (value_in));
656+ uint32_t mask = 1 << 7 ;
657+ while (time_decoder_->has_remaining (time_in) &&
658+ value_decoder_->has_remaining (value_in)) {
659+ cur_value_index++;
660+ bool should_read_data = true ;
661+ if (((value_page_col_notnull_bitmap_[cur_value_index / 8 ] & 0xFF ) &
662+ (mask >> (cur_value_index % 8 ))) == 0 ) {
663+ should_read_data = false ;
664+ }
658665 if (UNLIKELY (!row_appender.add_row ())) {
659666 ret = E_OVERFLOW;
667+ cur_value_index--;
660668 break ;
661669 } else if (RET_FAIL (time_decoder_->read_int64 (time, time_in))) {
662- } else if (RET_FAIL (value_decoder_->read_String (value, pa, value_in))) {
670+ } else if (should_read_data &&
671+ RET_FAIL (value_decoder_->read_String (value, pa, value_in))) {
663672 } else if (filter != nullptr && !filter->satisfy (time, value)) {
664673 row_appender.backoff_add_row ();
665674 continue ;
666675 } else {
667- row_appender.append (0 , (char *)&time, sizeof (time));
668- row_appender.append (1 , value.buf_ , value.len_ );
676+ row_appender.append (0 , (char *)&time, sizeof (time));
677+ if (!should_read_data) {
678+ row_appender.append_null (1 );
679+ } else {
680+ row_appender.append (1 , value.buf_ , value.len_ );
681+ }
669682 }
670683 }
671684 return ret;
0 commit comments