Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/clucene
Submodule clucene added at bb2224
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ DEFINE_mInt64(big_column_size_buffer, "65535");
DEFINE_mInt64(small_column_size_buffer, "100");

// Perform the always_true check at intervals determined by runtime_filter_sampling_frequency
DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
DEFINE_mInt32(runtime_filter_sampling_frequency, "32");
DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
DEFINE_mBool(execution_ignore_eovercrowded, "true");
// cooldown task configs
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/inverted_index_iterator.h"
#include "runtime/define_primitive_type.h"
#include "runtime_filter/runtime_filter_selectivity.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -372,8 +373,8 @@ class ColumnPredicate {
if (!_always_true) {
_judge_filter_rows += filter_rows;
_judge_input_rows += input_rows;
vectorized::VRuntimeFilterWrapper::judge_selectivity(
get_ignore_threshold(), _judge_filter_rows, _judge_input_rows, _always_true);
RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(), _judge_filter_rows,
_judge_input_rows, _always_true);
}
}

Expand Down
182 changes: 123 additions & 59 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ Status ColumnReader::_parse_zone_map(const ZoneMapPB& zone_map, WrapperField* mi
}
} else {
RETURN_IF_ERROR(min_value_container->from_string(zone_map.min()));
min_value_container->set_not_null();
}

if (zone_map.has_nan()) {
Expand All @@ -545,6 +546,7 @@ Status ColumnReader::_parse_zone_map(const ZoneMapPB& zone_map, WrapperField* mi
}
} else {
RETURN_IF_ERROR(max_value_container->from_string(zone_map.max()));
max_value_container->set_not_null();
}
}
// for compatible original Cond eval logic
Expand Down Expand Up @@ -1636,6 +1638,9 @@ Status FileColumnIterator::_seek_to_pos_in_page(ParsedPage* page, ordinal_t offs
}

auto num_nulls = [this](ordinal_t start, ordinal_t end) {
if (_page.is_continue) {
return 0;
}
auto null_count = 0;
for (auto i = start; i < end; i++) {
null_count += _page.null_maps[i];
Expand Down Expand Up @@ -1712,21 +1717,38 @@ Status FileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& d
auto& null_map = null_col->get_null_map_data();
auto nest_column = null_col->get_nested_column_ptr();

while (nrows_to_read > 0) {
bool is_null;
int i;
std::tie(is_null, i) = null_count(nrows_to_read);
if (is_null) {
null_col->insert_many_defaults(i);
} else {
null_map.resize_fill(null_map.size() + i, 0);
size_t num_rows = i;
RETURN_IF_ERROR(_page.data_decoder->next_batch(&num_rows, nest_column));
DCHECK_EQ(i, num_rows);
// Optimization: if is_continue is true, skip null_count check and read directly
if (_page.is_continue) {
// Copy null values from page's null_maps
size_t start_offset = _page.offset_in_page;
// Reserve space for new null values
null_map.resize(null_map.size() + nrows_to_read);
// Copy from page's null_maps to column's null_map using memcpy for better performance
memcpy(null_map.data() + null_map.size() - nrows_to_read,
_page.null_maps.data() + start_offset, nrows_to_read);

size_t num_rows = nrows_to_read;
RETURN_IF_ERROR(_page.data_decoder->next_batch(&num_rows, nest_column));
DCHECK_EQ(nrows_to_read, num_rows);
_page.offset_in_page += nrows_to_read;
_current_ordinal += nrows_to_read;
} else {
while (nrows_to_read > 0) {
bool is_null;
int i;
std::tie(is_null, i) = null_count(nrows_to_read);
if (is_null) {
null_col->insert_many_defaults(i);
} else {
null_map.resize_fill(null_map.size() + i, 0);
size_t num_rows = i;
RETURN_IF_ERROR(_page.data_decoder->next_batch(&num_rows, nest_column));
DCHECK_EQ(i, num_rows);
}
nrows_to_read -= i;
_page.offset_in_page += i;
_current_ordinal += i;
}
nrows_to_read -= i;
_page.offset_in_page += i;
_current_ordinal += i;
}
} else {
RETURN_IF_ERROR(_page.data_decoder->next_batch(&nrows_to_read, dst));
Expand Down Expand Up @@ -1760,65 +1782,107 @@ Status FileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t co
nrows_to_read = std::min(remaining, _page.remaining());

if (!_page.null_maps.empty()) {
size_t already_read = 0;
while ((nrows_to_read - already_read) > 0) {
bool is_null = false;
size_t this_run = std::min(nrows_to_read - already_read, _page.remaining());
if (UNLIKELY(this_run == 0)) {
break;
}
std::tie(is_null, this_run) = null_count(this_run);
size_t offset = total_read_count + already_read;
auto* null_col =
vectorized::check_and_get_column<vectorized::ColumnNullable>(dst.get());
if (UNLIKELY(null_col == nullptr)) {
return Status::InternalError("unexpected column type in column reader");
}
auto& null_map = null_col->get_null_map_data();
auto nest_column = null_col->get_nested_column_ptr();

// Optimization: if is_continue is true, skip null_count check and read directly
if (_page.is_continue) {
size_t offset = total_read_count;
size_t this_read_count = 0;
rowid_t current_ordinal_in_page =
cast_set<uint32_t>(_page.offset_in_page + _page.first_ordinal);
for (size_t i = 0; i < this_run; ++i) {
if (rowids[offset + i] - current_ordinal_in_page >= this_run) {
// rowid_t current_ordinal_in_page =
// cast_set<uint32_t>(_page.offset_in_page + _page.first_ordinal);

// Calculate how many rowids in this batch belong to the current page
for (size_t i = 0; i < nrows_to_read; ++i) {
// Check if this rowid is within the current page's range
if (rowids[offset + i] >= _page.first_ordinal + _page.num_rows) {
break;
}
this_read_count++;
}

auto origin_index = _page.data_decoder->current_index();
if (this_read_count > 0) {
auto* null_col =
vectorized::check_and_get_column<vectorized::ColumnNullable>(dst.get());
if (UNLIKELY(null_col == nullptr)) {
return Status::InternalError("unexpected column type in column reader");
}
auto& null_map = null_col->get_null_map_data();
auto nest_column = null_col->get_nested_column_ptr();
// Read data for the rows
size_t read_count = this_read_count;

if (is_null) {
null_col->insert_many_defaults(this_read_count);
} else {
size_t read_count = this_read_count;

// ordinal in nullable columns' data buffer maybe be not continuously(the data doesn't contain null value),
// so we need use `page_start_off_in_decoder` to calculate the actual offset in `data_decoder`
size_t page_start_off_in_decoder =
_page.first_ordinal + _page.offset_in_page - origin_index;
RETURN_IF_ERROR(_page.data_decoder->read_by_rowids(
&rowids[offset], page_start_off_in_decoder, &read_count,
nest_column));
null_map.resize_fill(null_map.size() + read_count, 0);
DCHECK_EQ(read_count, this_read_count);
// Read data using data_decoder's read_by_rowids
RETURN_IF_ERROR(_page.data_decoder->read_by_rowids(
&rowids[offset], _page.first_ordinal, &read_count, nest_column));

// Update null map with data from page's null_maps
size_t null_map_start_offset = null_map.size();
null_map.resize(null_map.size() + read_count);

// Copy null flags from page's null_maps for the rows we just read
for (size_t i = 0; i < read_count; ++i) {
size_t idx_in_page = rowids[offset + i] - _page.first_ordinal;
null_map[null_map_start_offset + i] = _page.null_maps[idx_in_page];
}
DCHECK_EQ(read_count, this_read_count);
}

if (!is_null) {
RETURN_IF_ERROR(
_page.data_decoder->seek_to_position_in_page(origin_index + this_run));
DCHECK_EQ(nest_column->size(), null_map.size());
total_read_count += this_read_count;
remaining -= this_read_count;
} else {
// Original logic for non-continue case
size_t already_read = 0;
while ((nrows_to_read - already_read) > 0) {
bool is_null = false;
size_t this_run = std::min(nrows_to_read - already_read, _page.remaining());
if (UNLIKELY(this_run == 0)) {
break;
}
std::tie(is_null, this_run) = null_count(this_run);
size_t offset = total_read_count + already_read;
size_t this_read_count = 0;
rowid_t current_ordinal_in_page =
cast_set<uint32_t>(_page.offset_in_page + _page.first_ordinal);
for (size_t i = 0; i < this_run; ++i) {
if (rowids[offset + i] - current_ordinal_in_page >= this_run) {
break;
}
this_read_count++;
}

auto origin_index = _page.data_decoder->current_index();
if (this_read_count > 0) {
if (is_null) {
null_col->insert_many_defaults(this_read_count);
} else {
size_t read_count = this_read_count;

// ordinal in nullable columns' data buffer maybe be not continuously(the data doesn't contain null value),
// so we need use `page_start_off_in_decoder` to calculate the actual offset in `data_decoder`
size_t page_start_off_in_decoder =
_page.first_ordinal + _page.offset_in_page - origin_index;
RETURN_IF_ERROR(_page.data_decoder->read_by_rowids(
&rowids[offset], page_start_off_in_decoder, &read_count,
nest_column));
null_map.resize_fill(null_map.size() + read_count, 0);
DCHECK_EQ(read_count, this_read_count);
}
}

if (!is_null) {
RETURN_IF_ERROR(_page.data_decoder->seek_to_position_in_page(origin_index +
this_run));
}

already_read += this_read_count;
_page.offset_in_page += this_run;
DCHECK(_page.offset_in_page <= _page.num_rows);
}

already_read += this_read_count;
_page.offset_in_page += this_run;
DCHECK(_page.offset_in_page <= _page.num_rows);
nrows_to_read = already_read;
total_read_count += nrows_to_read;
remaining -= nrows_to_read;
}

nrows_to_read = already_read;
total_read_count += nrows_to_read;
remaining -= nrows_to_read;
} else {
RETURN_IF_ERROR(_page.data_decoder->read_by_rowids(
&rowids[total_read_count], _page.first_ordinal, &nrows_to_read, dst));
Expand Down
Loading
Loading